You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by yz...@apache.org on 2017/07/14 10:55:40 UTC
ignite git commit: wip on data streamer
Repository: ignite
Updated Branches:
refs/heads/ignite-5658 [created] 6fc5ad90f
wip on data streamer
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6fc5ad90
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6fc5ad90
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6fc5ad90
Branch: refs/heads/ignite-5658
Commit: 6fc5ad90fa5af84e1f30d3efd669d2fade10f83d
Parents: 7338445
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Fri Jul 14 13:55:13 2017 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Fri Jul 14 13:55:13 2017 +0300
----------------------------------------------------------------------
.../datastreamer/DataStreamerImpl.java | 236 +++++++++++++------
.../DataStreamProcessorSelfTest.java | 93 ++++----
2 files changed, 213 insertions(+), 116 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/6fc5ad90/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index df51fac..c25a5ed 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -1310,12 +1310,15 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
/** Active futures. */
private final Collection<IgniteInternalFuture<Object>> locFuts;
+// /** Buffered entries. */
+// private List<DataStreamerEntry> entries;
/** Buffered entries. */
- private List<DataStreamerEntry> entries;
+ private final ConcurrentMap<Integer, PerPartitionBuffer> entriesMap =
+ new ConcurrentHashMap8<>();
- /** */
- @GridToStringExclude
- private GridFutureAdapter<Object> curFut;
+// /** */
+// @GridToStringExclude
+// private GridFutureAdapter<Object> curFut;
/** Local node flag. */
private final boolean isLocNode;
@@ -1329,8 +1332,8 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
/** */
private final Semaphore sem;
- /** Batch topology. */
- private AffinityTopologyVersion batchTopVer;
+// /** Batch topology. */
+// private AffinityTopologyVersion batchTopVer;
/** Closure to signal on task finish. */
@GridToStringExclude
@@ -1354,25 +1357,25 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
// Cache local node flag.
isLocNode = node.equals(ctx.discovery().localNode());
- entries = newEntries();
- curFut = new GridFutureAdapter<>();
- curFut.listen(signalC);
+// entries = newEntries();
+// curFut = new GridFutureAdapter<>();
+// curFut.listen(signalC);
sem = new Semaphore(parallelOps);
}
- /**
- * @param remap Remapping flag.
- */
- private void renewBatch(boolean remap) {
- entries = newEntries();
- curFut = new GridFutureAdapter<>();
-
- batchTopVer = null;
-
- if (!remap)
- curFut.listen(signalC);
- }
+// /**
+// * @param remap Remapping flag.
+// */
+// private void renewBatch(boolean remap) {
+// entries = newEntries();
+// curFut = new GridFutureAdapter<>();
+//
+// batchTopVer = null;
+//
+// if (!remap)
+// curFut.listen(signalC);
+// }
/**
* @param newEntries Infos.
@@ -1382,51 +1385,69 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
* @return Future for operation.
* @throws IgniteInterruptedCheckedException If failed.
*/
- @Nullable GridFutureAdapter<?> update(Iterable<DataStreamerEntry> newEntries,
+ @Nullable GridFutureAdapter<?> update(
+ Iterable<DataStreamerEntry> newEntries,
AffinityTopologyVersion topVer,
IgniteInClosure<IgniteInternalFuture<?>> lsnr,
- boolean remap) throws IgniteInterruptedCheckedException {
- List<DataStreamerEntry> entries0 = null;
+ boolean remap
+ ) throws IgniteInterruptedCheckedException {
+ GridFutureAdapter<Object> curFut0 = null;
- GridFutureAdapter<Object> curFut0;
+ for (DataStreamerEntry entry : newEntries) {
+ List<DataStreamerEntry> entries0 = null;
+ AffinityTopologyVersion curBatchTopVer;
+
+ // Init buffer.
+ int part = entry.getKey().partition();
+
+ PerPartitionBuffer b = entriesMap.get(part);
- AffinityTopologyVersion curBatchTopVer;
+ if (b == null) {
+ PerPartitionBuffer old =
+ entriesMap.putIfAbsent(part, b = new PerPartitionBuffer(part, signalC));
+
+ if (old != null)
+ b = old;
+ }
- synchronized (this) {
- curFut0 = curFut;
+ synchronized (b) {
+ curFut0 = b.curFut;
- curFut0.listen(lsnr);
+ // TODO: think over proper listener model:
+ // Listener should be added only once per whole entries collection.
+ // Should we simplify the model and get rid of all futures?
+ curFut0.listen(lsnr);
- if (batchTopVer == null)
- batchTopVer = topVer;
+ if (b.batchTopVer == null)
+ b.batchTopVer = topVer;
- curBatchTopVer = batchTopVer;
+ curBatchTopVer = b.batchTopVer;
- for (DataStreamerEntry entry : newEntries)
- entries.add(entry);
+ b.entries.add(entry);
- if (entries.size() >= bufSize) {
- entries0 = entries;
+ if (b.entries.size() >= bufSize) {
+ entries0 = b.entries;
- renewBatch(remap);
+ b.renewBatch(remap);
+ }
}
- }
- if (!allowOverwrite() && !topVer.equals(curBatchTopVer)) {
- renewBatch(remap);
+ if (!allowOverwrite() && !topVer.equals(curBatchTopVer)) {
+ b.renewBatch(remap);
- curFut0.onDone(null, new IgniteCheckedException("Topology changed during batch preparation." +
- "[batchTopVer=" + curBatchTopVer + ", topVer=" + topVer + "]"));
- }
- else if (entries0 != null) {
- submit(entries0, curBatchTopVer, curFut0, remap);
-
- if (cancelled)
- curFut0.onDone(new IgniteCheckedException("Data streamer has been cancelled: " +
- DataStreamerImpl.this));
- else if (ctx.clientDisconnected())
- curFut0.onDone(new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(),
- "Client node disconnected."));
+ curFut0.onDone(null, new IgniteCheckedException("Topology changed during batch preparation." +
+ "[batchTopVer=" + curBatchTopVer + ", topVer=" + topVer + "]"));
+ }
+ else if (entries0 != null) {
+ submit(entries0, curBatchTopVer, curFut0, remap);
+
+ if (cancelled)
+ curFut0.onDone(new IgniteCheckedException("Data streamer has been cancelled: " +
+ DataStreamerImpl.this));
+ else if (ctx.clientDisconnected())
+ curFut0.onDone(new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(),
+ "Client node disconnected."));
+ }
}
return curFut0;
@@ -1449,19 +1470,22 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
acquireRemapSemaphore();
- synchronized (this) {
- if (!entries.isEmpty()) {
- entries0 = entries;
- curFut0 = curFut;
+ for (PerPartitionBuffer b : entriesMap.values()) {
+ AffinityTopologyVersion batchTopVer = null;
- entries = newEntries();
- curFut = new GridFutureAdapter<>();
- curFut.listen(signalC);
+ synchronized (b) {
+ if (!b.entries.isEmpty()) {
+ entries0 = b.entries;
+ curFut0 = b.curFut;
+ batchTopVer = b.batchTopVer;
+
+ b.renewBatch(false);
+ }
}
- }
- if (entries0 != null)
- submit(entries0, batchTopVer, curFut0, false);
+ if (entries0 != null)
+ submit(entries0, batchTopVer, curFut0, false);
+ }
// Create compound future for this flush.
GridCompoundFuture<Object, Object> res = null;
@@ -1492,8 +1516,11 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
* @throws IgniteInterruptedCheckedException If thread has been interrupted.
*/
private void incrementActiveTasks() throws IgniteInterruptedCheckedException {
- if (timeout == DFLT_UNLIMIT_TIMEOUT)
+ if (timeout == DFLT_UNLIMIT_TIMEOUT) {
+ if (sem.availablePermits() <= 0)
+ U.debug(log, "No Permits node=" + node.order() + " " + node.isLocal() + " " + isLocNode);
U.acquire(sem);
+ }
else if (!U.tryAcquire(sem, timeout, TimeUnit.MILLISECONDS)) {
if (log.isDebugEnabled())
log.debug("Failed to add parallel operation.");
@@ -1508,7 +1535,9 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
private void signalTaskFinished(IgniteInternalFuture<Object> f) {
assert f != null;
+
sem.release();
+ U.debug(log, "Released: " + sem.availablePermits() + " node=" + node.order());
}
/**
@@ -1603,10 +1632,12 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
* @param remap Remapping flag.
* @throws IgniteInterruptedCheckedException If interrupted.
*/
- private void submit(final Collection<DataStreamerEntry> entries,
+ private void submit(
+ final Collection<DataStreamerEntry> entries,
@Nullable AffinityTopologyVersion topVer,
final GridFutureAdapter<Object> curFut,
- boolean remap)
+ boolean remap
+ )
throws IgniteInterruptedCheckedException {
assert entries != null;
assert !entries.isEmpty();
@@ -1750,11 +1781,13 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
// Make sure to complete current future.
GridFutureAdapter<Object> curFut0;
- synchronized (this) {
- curFut0 = curFut;
- }
+ for (PerPartitionBuffer b : entriesMap.values()) {
+ synchronized (b) {
+ curFut0 = b.curFut;
+ }
- curFut0.onDone(e);
+ curFut0.onDone(e);
+ }
}
/**
@@ -1828,11 +1861,12 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
/** {@inheritDoc} */
@Override public String toString() {
- int size;
+ int size = -1;
- synchronized (this) {
- size = entries.size();
- }
+ // TODO
+// synchronized (this) {
+// size = entries.size();
+// }
return S.toString(Buffer.class, this,
"entriesCnt", size,
@@ -2080,4 +2114,60 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
return S.toString(KeyCacheObjectWrapper.class, this);
}
}
+
+ /**
+ *
+ */
+ private class PerPartitionBuffer {
+ /** */
+ private final int partId;
+
+ /** */
+ private List<DataStreamerEntry> entries;
+
+ /** */
+ private GridFutureAdapter<Object> curFut;
+
+ /** Batch topology. */
+ private AffinityTopologyVersion batchTopVer;
+
+ /** */
+ private final IgniteInClosure<? super IgniteInternalFuture<Object>> signalC;
+
+ /**
+ * @param partId Partition ID.
+ * @param c Signal closure.
+ */
+ public PerPartitionBuffer(
+ int partId,
+ IgniteInClosure<? super IgniteInternalFuture<Object>> c
+ ) {
+ this.partId = partId;
+ signalC = c;
+
+ renewBatch(false);
+ }
+
+ synchronized void renewBatch(boolean remap) {
+ entries = newEntries();
+ curFut = new GridFutureAdapter<>();
+
+ batchTopVer = null;
+
+ if (!remap)
+ curFut.listen(signalC);
+ }
+
+ /**
+ * @return Fresh collection with some space for outgrowth.
+ */
+ private List<DataStreamerEntry> newEntries() {
+ return new ArrayList<>((int)(bufSize * 1.2));
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(PerPartitionBuffer.class, this, super.toString());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6fc5ad90/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
index ec5e6d0..e33d811 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
@@ -473,7 +473,7 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest {
final AtomicBoolean done = new AtomicBoolean();
try {
- final int totalPutCnt = 50000;
+ final int totalPutCnt = 10000;
IgniteInternalFuture<?> fut1 = multithreadedAsync(new Callable<Object>() {
@Override public Object call() throws Exception {
@@ -491,60 +491,67 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest {
futs.add(ldr.addData(idx, idx));
}
- ldr.flush();
+// ldr.flush();
- for (IgniteFuture<?> fut : futs)
- fut.get();
+// for (IgniteFuture<?> fut : futs) {
+// info("Before: " + fut);
+//
+// fut.get();
+//
+// info("After.");
+// }
return null;
}
}, 5, "producer");
- IgniteInternalFuture<?> fut2 = multithreadedAsync(new Callable<Object>() {
- @Override public Object call() throws Exception {
- while (!done.get()) {
- ldr.flush();
-
- U.sleep(100);
- }
-
- return null;
- }
- }, 1, "flusher");
+// IgniteInternalFuture<?> fut2 = multithreadedAsync(new Callable<Object>() {
+// @Override public Object call() throws Exception {
+// while (!done.get()) {
+// ldr.flush();
+//
+// U.sleep(100);
+// }
+//
+// return null;
+// }
+// }, 1, "flusher");
// Define index of node being restarted.
final int restartNodeIdx = nodesCntCache + nodesCntNoCache + 1;
- IgniteInternalFuture<?> fut3 = multithreadedAsync(new Callable<Object>() {
- @Override public Object call() throws Exception {
- try {
- for (int i = 0; i < 5; i++) {
- Ignite g = startGrid(restartNodeIdx);
-
- UUID id = g.cluster().localNode().id();
-
- info(">>>>>>> Started node: " + id);
-
- U.sleep(1000);
-
- stopGrid(getTestIgniteInstanceName(restartNodeIdx), true);
-
- info(">>>>>>> Stopped node: " + id);
- }
- }
- finally {
- done.set(true);
-
- info("Start stop thread finished.");
- }
-
- return null;
- }
- }, 1, "start-stop-thread");
+ //TODO Uncomment
+
+// IgniteInternalFuture<?> fut3 = multithreadedAsync(new Callable<Object>() {
+// @Override public Object call() throws Exception {
+// try {
+// for (int i = 0; i < 5; i++) {
+// Ignite g = startGrid(restartNodeIdx);
+//
+// UUID id = g.cluster().localNode().id();
+//
+// info(">>>>>>> Started node: " + id);
+//
+// U.sleep(1000);
+//
+// stopGrid(getTestIgniteInstanceName(restartNodeIdx), true);
+//
+// info(">>>>>>> Stopped node: " + id);
+// }
+// }
+// finally {
+// done.set(true);
+//
+// info("Start stop thread finished.");
+// }
+//
+// return null;
+// }
+// }, 1, "start-stop-thread");
fut1.get();
- fut2.get();
- fut3.get();
+// fut2.get();
+// fut3.get();
}
finally {
ldr.close(false);