You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by hs...@apache.org on 2016/09/20 15:45:44 UTC
[1/4] apex-malhar git commit: APEXMALHAR-2230 simplify the kafka
input operator test
Repository: apex-malhar
Updated Branches:
refs/heads/master 0c4b3fce2 -> 7b2d7e3d9
APEXMALHAR-2230 simplify the kafka input operator test
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/5909dfdc
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/5909dfdc
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/5909dfdc
Branch: refs/heads/master
Commit: 5909dfdc491fdca0cea7eca56fe72b8e1d32bcc3
Parents: 9f9da0e
Author: Siyuan Hua <hs...@apache.org>
Authored: Thu Sep 15 08:31:34 2016 -0700
Committer: Siyuan Hua <hs...@apache.org>
Committed: Thu Sep 15 08:31:34 2016 -0700
----------------------------------------------------------------------
.../malhar/kafka/KafkaInputOperatorTest.java | 32 ++------------------
1 file changed, 3 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5909dfdc/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
index 8440615..47a374b 100644
--- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
+++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
@@ -137,10 +137,6 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
private static final org.slf4j.Logger logger = LoggerFactory.getLogger(KafkaInputOperatorTest.class);
private static List<String> tupleCollection = new LinkedList<>();
- /**
- * whether countDown latch count all tuples or just END_TUPLE
- */
- private static final boolean countDownAll = false;
private static final int scale = 2;
private static final int totalCount = 10 * scale;
private static final int failureTrigger = 3 * scale;
@@ -232,33 +228,11 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
int tupleSize = windowTupleCollector.size();
tupleCollection.addAll(windowTupleCollector);
- int countDownTupleSize = countDownAll ? tupleSize : endTuples;
+ int countDownTupleSize = endTuples;
if (latch != null) {
- Assert.assertTrue("received END_TUPLES more than expected.", latch.getCount() >= countDownTupleSize);
- while (countDownTupleSize > 0) {
+ while (countDownTupleSize-- > 0) {
latch.countDown();
- --countDownTupleSize;
- }
- if (latch.getCount() == 0) {
- /**
- * The time before countDown() and the shutdown() of the application
- * will cause fatal error:
- * "Catastrophic Error: Invalid State - the operator blocked forever!"
- * as the activeQueues could be cleared but alive haven't changed yet.
- * throw the ShutdownException to let the engine shutdown;
- */
- try {
- throw new ShutdownException();
- //lc.shutdown();
- } finally {
- /**
- * interrupt the engine thread, let it wake from sleep and handle
- * the shutdown at this time, all payload should be handled. so it
- * should be ok to interrupt
- */
- monitorThread.interrupt();
- }
}
}
}
@@ -301,7 +275,7 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
public void testInputOperator(boolean hasFailure, boolean idempotent) throws Exception
{
// each broker should get a END_TUPLE message
- latch = new CountDownLatch(countDownAll ? totalCount + totalBrokers : totalBrokers);
+ latch = new CountDownLatch(totalBrokers);
logger.info("Test Case: name: {}; totalBrokers: {}; hasFailure: {}; hasMultiCluster: {}; hasMultiPartition: {}, partition: {}",
testName, totalBrokers, hasFailure, hasMultiCluster, hasMultiPartition, partition);
[4/4] apex-malhar git commit: Merge branch 'APEXMALHAR-2245' of
https://github.com/davidyan74/incubator-apex-malhar
Posted by hs...@apache.org.
Merge branch 'APEXMALHAR-2245' of https://github.com/davidyan74/incubator-apex-malhar
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/7b2d7e3d
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/7b2d7e3d
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/7b2d7e3d
Branch: refs/heads/master
Commit: 7b2d7e3d90c7dc1befa8d69553928fcdc2931367
Parents: efe5876 447b26d
Author: Siyuan Hua <hs...@apache.org>
Authored: Tue Sep 20 08:45:04 2016 -0700
Committer: Siyuan Hua <hs...@apache.org>
Committed: Tue Sep 20 08:45:04 2016 -0700
----------------------------------------------------------------------
.../lib/state/spillable/WindowBoundedMapCache.java | 13 +++++--------
1 file changed, 5 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
[3/4] apex-malhar git commit: Merge branch 'master' of
github.com:apache/incubator-apex-malhar
Posted by hs...@apache.org.
Merge branch 'master' of github.com:apache/incubator-apex-malhar
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/efe58763
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/efe58763
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/efe58763
Branch: refs/heads/master
Commit: efe587634c64b650dd4f4a3339a27a57ca6b395c
Parents: 5909dfd 0c4b3fc
Author: Siyuan Hua <hs...@apache.org>
Authored: Tue Sep 20 08:43:40 2016 -0700
Committer: Siyuan Hua <hs...@apache.org>
Committed: Tue Sep 20 08:43:40 2016 -0700
----------------------------------------------------------------------
.../state/ManagedStateBenchmarkApp.java | 21 +--
.../benchmark/state/StoreOperator.java | 141 ++++++++++++++++++-
.../src/main/resources/META-INF/properties.xml | 5 +
.../state/ManagedStateBenchmarkAppTester.java | 37 ++++-
4 files changed, 186 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
[2/4] apex-malhar git commit: APEXMALHAR-2245 #resolve Add the key in
removedKeys even if the key does not appear in the cache
Posted by hs...@apache.org.
APEXMALHAR-2245 #resolve Add the key in removedKeys even if the key does not appear in the cache
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/447b26df
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/447b26df
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/447b26df
Branch: refs/heads/master
Commit: 447b26df0f73a29e19d894663c92fd99e6384420
Parents: 9f9da0e
Author: David Yan <da...@datatorrent.com>
Authored: Mon Sep 19 17:13:22 2016 -0700
Committer: David Yan <da...@datatorrent.com>
Committed: Mon Sep 19 17:13:22 2016 -0700
----------------------------------------------------------------------
.../lib/state/spillable/WindowBoundedMapCache.java | 13 +++++--------
1 file changed, 5 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/447b26df/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/WindowBoundedMapCache.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/WindowBoundedMapCache.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/WindowBoundedMapCache.java
index 6db1f1a..0e1d55e 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/WindowBoundedMapCache.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/WindowBoundedMapCache.java
@@ -87,15 +87,12 @@ public class WindowBoundedMapCache<K, V>
public void remove(K key)
{
Preconditions.checkNotNull(key);
-
- if (!cache.containsKey(key)) {
- return;
- }
-
- cache.remove(key);
- changedKeys.remove(key);
removedKeys.add(key);
- priorityQueue.remove(key);
+ if (cache.containsKey(key)) {
+ cache.remove(key);
+ changedKeys.remove(key);
+ priorityQueue.remove(key);
+ }
}
public Set<K> getChangedKeys()