You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2016/09/12 08:26:57 UTC
[43/46] ignite git commit: IGNITE-3828: Added wrapper for
DataStreamerImpl keys to minimize impact of hash code collisions. This closes
#1034.
IGNITE-3828: Added wrapper for DataStreamerImpl keys to minimize impact of hash code collisions. This closes #1034.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6c3993d9
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6c3993d9
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6c3993d9
Branch: refs/heads/master
Commit: 6c3993d9d4b2126a4ef9699fdb3c0d296b03dea7
Parents: 65c92fa
Author: Andrey V. Mashenkov <an...@gmail.com>
Authored: Fri Sep 9 13:09:40 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Fri Sep 9 13:09:40 2016 +0300
----------------------------------------------------------------------
.../datastreamer/DataStreamerImpl.java | 69 +++++++++++++++-----
1 file changed, 51 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/6c3993d9/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index a3bae24..05e6488 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -39,6 +39,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.cache.CacheException;
import javax.cache.expiry.ExpiryPolicy;
+
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
@@ -139,7 +140,6 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
/** Cache name ({@code null} for default cache). */
private final String cacheName;
-
/** Per-node buffer size. */
@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
private int bufSize = DFLT_PER_NODE_BUFFER_SIZE;
@@ -512,7 +512,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
activeFuts.add(resFut);
- Collection<KeyCacheObject> keys =
+ Collection<KeyCacheObjectWrapper> keys =
new GridConcurrentHashSet<>(entries.size(), U.capacity(entries.size()), 1);
Collection<DataStreamerEntry> entries0 = new ArrayList<>(entries.size());
@@ -521,7 +521,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
KeyCacheObject key = cacheObjProc.toCacheKeyObject(cacheObjCtx, null, entry.getKey(), true);
CacheObject val = cacheObjProc.toCacheObject(cacheObjCtx, entry.getValue(), true);
- keys.add(key);
+ keys.add(new KeyCacheObjectWrapper(key));
entries0.add(new DataStreamerEntry(key, val));
}
@@ -572,13 +572,13 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
activeFuts.add(resFut);
- Collection<KeyCacheObject> keys = null;
+ Collection<KeyCacheObjectWrapper> keys = null;
if (entries.size() > 1) {
keys = new GridConcurrentHashSet<>(entries.size(), U.capacity(entries.size()), 1);
for (DataStreamerEntry entry : entries)
- keys.add(entry.getKey());
+ keys.add(new KeyCacheObjectWrapper(entry.getKey()));
}
load0(entries, resFut, keys, 0);
@@ -641,7 +641,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
private void load0(
Collection<? extends DataStreamerEntry> entries,
final GridFutureAdapter<Object> resFut,
- @Nullable final Collection<KeyCacheObject> activeKeys,
+ @Nullable final Collection<KeyCacheObjectWrapper> activeKeys,
final int remaps
) {
assert entries != null;
@@ -729,7 +729,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
if (activeKeys != null) {
for (DataStreamerEntry e : entriesForNode)
- activeKeys.remove(e.getKey());
+ activeKeys.remove(new KeyCacheObjectWrapper(e.getKey()));
if (activeKeys.isEmpty())
resFut.onDone();
@@ -1103,7 +1103,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
* @throws org.apache.ignite.plugin.security.SecurityException If permissions are not enough for streaming.
*/
private void checkSecurityPermission(SecurityPermission perm)
- throws org.apache.ignite.plugin.security.SecurityException{
+ throws org.apache.ignite.plugin.security.SecurityException {
if (!ctx.security().enabled())
return;
@@ -1172,8 +1172,8 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
* @param newEntries Infos.
* @param topVer Topology version.
* @param lsnr Listener for the operation future.
- * @throws IgniteInterruptedCheckedException If failed.
* @return Future for operation.
+ * @throws IgniteInterruptedCheckedException If failed.
*/
@Nullable GridFutureAdapter<?> update(Iterable<DataStreamerEntry> newEntries,
AffinityTopologyVersion topVer,
@@ -1221,7 +1221,6 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
/**
* @return Future if any submitted.
- *
* @throws IgniteInterruptedCheckedException If thread has been interrupted.
*/
@Nullable IgniteInternalFuture<?> flush() throws IgniteInterruptedCheckedException {
@@ -1273,13 +1272,12 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
private void incrementActiveTasks() throws IgniteInterruptedCheckedException {
if (timeout == DFLT_UNLIMIT_TIMEOUT)
U.acquire(sem);
- else
- if (!U.tryAcquire(sem, timeout, TimeUnit.MILLISECONDS)) {
- if (log.isDebugEnabled())
- log.debug("Failed to add parallel operation.");
+ else if (!U.tryAcquire(sem, timeout, TimeUnit.MILLISECONDS)) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to add parallel operation.");
- throw new IgniteDataStreamerTimeoutException("Data streamer exceeded timeout when starts parallel operation.");
- }
+ throw new IgniteDataStreamerTimeoutException("Data streamer exceeded timeout when starts parallel operation.");
+ }
}
/**
@@ -1307,7 +1305,8 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
try {
incrementActiveTasks();
- } catch (IgniteDataStreamerTimeoutException e) {
+ }
+ catch (IgniteDataStreamerTimeoutException e) {
curFut.onDone(e);
throw e;
}
@@ -1574,7 +1573,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
if (depCls != null)
cls0 = depCls;
else {
- for (Iterator<Object> it = objs.iterator(); (cls0 == null || U.isJdk(cls0)) && it.hasNext();) {
+ for (Iterator<Object> it = objs.iterator(); (cls0 == null || U.isJdk(cls0)) && it.hasNext(); ) {
Object o = it.next();
if (o != null)
@@ -1696,4 +1695,38 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
return PUBLIC_POOL;
}
}
+
+ /**
+ * Key object wrapper. Using identity equals prevents slow down in case of hash code collision.
+ */
+ private static class KeyCacheObjectWrapper {
+ /** key object */
+ private final KeyCacheObject key;
+
+ /**
+ * Constructor
+ *
+ * @param key key object
+ */
+ KeyCacheObjectWrapper(KeyCacheObject key) {
+ assert key != null;
+
+ this.key = key;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ return o instanceof KeyCacheObjectWrapper && this.key == ((KeyCacheObjectWrapper)o).key;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return key.hashCode();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(KeyCacheObjectWrapper.class, this);
+ }
+ }
}