You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by hs...@apache.org on 2016/09/20 15:45:44 UTC

[1/4] apex-malhar git commit: APEXMALHAR-2230 simplify the kafka input operator test

Repository: apex-malhar
Updated Branches:
  refs/heads/master 0c4b3fce2 -> 7b2d7e3d9


APEXMALHAR-2230 simplify the kafka input operator test


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/5909dfdc
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/5909dfdc
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/5909dfdc

Branch: refs/heads/master
Commit: 5909dfdc491fdca0cea7eca56fe72b8e1d32bcc3
Parents: 9f9da0e
Author: Siyuan Hua <hs...@apache.org>
Authored: Thu Sep 15 08:31:34 2016 -0700
Committer: Siyuan Hua <hs...@apache.org>
Committed: Thu Sep 15 08:31:34 2016 -0700

----------------------------------------------------------------------
 .../malhar/kafka/KafkaInputOperatorTest.java    | 32 ++------------------
 1 file changed, 3 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5909dfdc/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
index 8440615..47a374b 100644
--- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
+++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
@@ -137,10 +137,6 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
   private static final org.slf4j.Logger logger = LoggerFactory.getLogger(KafkaInputOperatorTest.class);
   private static List<String> tupleCollection = new LinkedList<>();
 
-  /**
-   * whether countDown latch count all tuples or just END_TUPLE
-   */
-  private static final boolean countDownAll = false;
   private static final int scale = 2;
   private static final int totalCount = 10 * scale;
   private static final int failureTrigger = 3 * scale;
@@ -232,33 +228,11 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
       int tupleSize = windowTupleCollector.size();
       tupleCollection.addAll(windowTupleCollector);
       
-      int countDownTupleSize = countDownAll ? tupleSize : endTuples;
+      int countDownTupleSize = endTuples;
 
       if (latch != null) {
-        Assert.assertTrue("received END_TUPLES more than expected.", latch.getCount() >= countDownTupleSize);
-        while (countDownTupleSize > 0) {
+        while (countDownTupleSize-- > 0) {
             latch.countDown();
-            --countDownTupleSize;
-        }
-        if (latch.getCount() == 0) {
-          /**
-           * The time before countDown() and the shutdown() of the application
-           * will cause fatal error:
-           * "Catastrophic Error: Invalid State - the operator blocked forever!"
-           * as the activeQueues could be cleared but alive haven't changed yet.
-           * throw the ShutdownException to let the engine shutdown;
-           */
-          try {
-            throw new ShutdownException();
-            //lc.shutdown();
-          } finally {
-            /**
-             * interrupt the engine thread, let it wake from sleep and handle
-             * the shutdown at this time, all payload should be handled. so it
-             * should be ok to interrupt
-             */
-            monitorThread.interrupt();
-          }
         }
       }
     }
@@ -301,7 +275,7 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
   public void testInputOperator(boolean hasFailure, boolean idempotent) throws Exception
   {
     // each broker should get a END_TUPLE message
-    latch = new CountDownLatch(countDownAll ? totalCount + totalBrokers : totalBrokers);
+    latch = new CountDownLatch(totalBrokers);
 
     logger.info("Test Case: name: {}; totalBrokers: {}; hasFailure: {}; hasMultiCluster: {}; hasMultiPartition: {}, partition: {}", 
         testName, totalBrokers, hasFailure, hasMultiCluster, hasMultiPartition, partition); 


[4/4] apex-malhar git commit: Merge branch 'APEXMALHAR-2245' of https://github.com/davidyan74/incubator-apex-malhar

Posted by hs...@apache.org.
Merge branch 'APEXMALHAR-2245' of https://github.com/davidyan74/incubator-apex-malhar


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/7b2d7e3d
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/7b2d7e3d
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/7b2d7e3d

Branch: refs/heads/master
Commit: 7b2d7e3d90c7dc1befa8d69553928fcdc2931367
Parents: efe5876 447b26d
Author: Siyuan Hua <hs...@apache.org>
Authored: Tue Sep 20 08:45:04 2016 -0700
Committer: Siyuan Hua <hs...@apache.org>
Committed: Tue Sep 20 08:45:04 2016 -0700

----------------------------------------------------------------------
 .../lib/state/spillable/WindowBoundedMapCache.java     | 13 +++++--------
 1 file changed, 5 insertions(+), 8 deletions(-)
----------------------------------------------------------------------



[3/4] apex-malhar git commit: Merge branch 'master' of github.com:apache/incubator-apex-malhar

Posted by hs...@apache.org.
Merge branch 'master' of github.com:apache/incubator-apex-malhar


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/efe58763
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/efe58763
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/efe58763

Branch: refs/heads/master
Commit: efe587634c64b650dd4f4a3339a27a57ca6b395c
Parents: 5909dfd 0c4b3fc
Author: Siyuan Hua <hs...@apache.org>
Authored: Tue Sep 20 08:43:40 2016 -0700
Committer: Siyuan Hua <hs...@apache.org>
Committed: Tue Sep 20 08:43:40 2016 -0700

----------------------------------------------------------------------
 .../state/ManagedStateBenchmarkApp.java         |  21 +--
 .../benchmark/state/StoreOperator.java          | 141 ++++++++++++++++++-
 .../src/main/resources/META-INF/properties.xml  |   5 +
 .../state/ManagedStateBenchmarkAppTester.java   |  37 ++++-
 4 files changed, 186 insertions(+), 18 deletions(-)
----------------------------------------------------------------------



[2/4] apex-malhar git commit: APEXMALHAR-2245 #resolve Add the key in removedKeys even if the key does not appear in the cache

Posted by hs...@apache.org.
APEXMALHAR-2245 #resolve Add the key in removedKeys even if the key does not appear in the cache


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/447b26df
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/447b26df
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/447b26df

Branch: refs/heads/master
Commit: 447b26df0f73a29e19d894663c92fd99e6384420
Parents: 9f9da0e
Author: David Yan <da...@datatorrent.com>
Authored: Mon Sep 19 17:13:22 2016 -0700
Committer: David Yan <da...@datatorrent.com>
Committed: Mon Sep 19 17:13:22 2016 -0700

----------------------------------------------------------------------
 .../lib/state/spillable/WindowBoundedMapCache.java     | 13 +++++--------
 1 file changed, 5 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/447b26df/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/WindowBoundedMapCache.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/WindowBoundedMapCache.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/WindowBoundedMapCache.java
index 6db1f1a..0e1d55e 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/WindowBoundedMapCache.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/WindowBoundedMapCache.java
@@ -87,15 +87,12 @@ public class WindowBoundedMapCache<K, V>
   public void remove(K key)
   {
     Preconditions.checkNotNull(key);
-
-    if (!cache.containsKey(key)) {
-      return;
-    }
-
-    cache.remove(key);
-    changedKeys.remove(key);
     removedKeys.add(key);
-    priorityQueue.remove(key);
+    if (cache.containsKey(key)) {
+      cache.remove(key);
+      changedKeys.remove(key);
+      priorityQueue.remove(key);
+    }
   }
 
   public Set<K> getChangedKeys()