You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2017/02/21 08:50:47 UTC

apex-malhar git commit: APEXMALHAR-2381 Change WindowManager for performance issues in Kinesis Input Operator This change contains: 1) Change WindowManager for performance issues in Kinesis Input Operator. 2) Unit test for default WindowDataManger for Ki

Repository: apex-malhar
Updated Branches:
  refs/heads/master c5af27b1e -> 39ca1819a


APEXMALHAR-2381 Change WindowManager for performance issues in Kinesis Input Operator
This change contains:
1) Change WindowManager for performance issues in Kinesis Input Operator.
2) Unit test for default WindowDataManger for KinesisInputOperator.
3) Fix for addtion of fasterxml dependency previous all unit test were failing.


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/39ca1819
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/39ca1819
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/39ca1819

Branch: refs/heads/master
Commit: 39ca1819a09e35eebc49ca66efe6efbe802a54e3
Parents: c5af27b
Author: deepak-narkhede <ma...@gmail.com>
Authored: Mon Feb 13 16:38:16 2017 +0530
Committer: deepak-narkhede <ma...@gmail.com>
Committed: Tue Feb 21 13:29:58 2017 +0530

----------------------------------------------------------------------
 contrib/pom.xml                                 | 15 ++++++++
 .../kinesis/AbstractKinesisInputOperator.java   |  7 ++--
 .../kinesis/KinesisInputOperatorTest.java       | 36 +++++++++++++++++---
 3 files changed, 52 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/39ca1819/contrib/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/pom.xml b/contrib/pom.xml
index cba98c9..4636ca3 100755
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -664,5 +664,20 @@
       <version>2.0.0</version>
       <optional>true</optional>
     </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-annotations</artifactId>
+      <version>2.7.0</version>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-core</artifactId>
+      <version>2.7.0</version>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+      <version>2.7.0</version>
+    </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/39ca1819/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java
index 30ceadb..901aaa3 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java
@@ -36,7 +36,6 @@ import javax.validation.constraints.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
 import org.apache.apex.malhar.lib.wal.WindowDataManager;
 import org.apache.commons.lang3.StringUtils;
 
@@ -146,7 +145,11 @@ public abstract class AbstractKinesisInputOperator <T> implements InputOperator,
 
   public AbstractKinesisInputOperator()
   {
-    windowDataManager = new FSWindowDataManager();
+    /*
+     * Application may override the windowDataManger behaviour but default
+     * would be NoopWindowDataManager.
+     */
+    windowDataManager = new WindowDataManager.NoopWindowDataManager();
     currentWindowRecoveryState = new HashMap<String, KinesisPair<String, Integer>>();
   }
   /**

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/39ca1819/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisInputOperatorTest.java
index a79d03a..63bda70 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisInputOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisInputOperatorTest.java
@@ -33,13 +33,17 @@ import org.junit.rules.TestWatcher;
 import org.junit.runner.Description;
 import org.slf4j.LoggerFactory;
 
-import com.datatorrent.lib.helper.OperatorContextTestHelper;
-import com.datatorrent.lib.testbench.CollectorTestSink;
+import org.apache.apex.malhar.lib.wal.WindowDataManager;
 
-import com.datatorrent.api.*;
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
-
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.LocalMode;
 import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.testbench.CollectorTestSink;
 
 public class KinesisInputOperatorTest extends KinesisOperatorTestBase
 {
@@ -91,6 +95,30 @@ public class KinesisInputOperatorTest extends KinesisOperatorTestBase
     }
   }
 
+  @Test
+  public void testWindowDataManager() throws Exception
+  {
+    // Create DAG for testing.
+    LocalMode lma = LocalMode.newInstance();
+    DAG dag = lma.getDAG();
+
+    KinesisStringInputOperator inputOperator = dag.addOperator("KinesisInput", new KinesisStringInputOperator()
+    {
+      @Override
+      public void deactivate()
+      {
+      }
+
+      @Override
+      public void teardown()
+      {
+      }
+    });
+    testMeta.operator = inputOperator;
+    Assert.assertTrue("Default behaviour of WindowDataManager changed",
+      (inputOperator.getWindowDataManager() instanceof WindowDataManager.NoopWindowDataManager));
+  }
+
   /**
    * Test AbstractKinesisSinglePortInputOperator (i.e. an input adapter for
    * Kinesis, consumer). This module receives data from an outside test