You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by vr...@apache.org on 2017/05/25 20:43:57 UTC

apex-malhar git commit: APEXMALHAR-2471 Upgrading APEXCORE dependency to version 3.6.0

Repository: apex-malhar
Updated Branches:
  refs/heads/master 2493bcbf5 -> 8e44a9cab


APEXMALHAR-2471 Upgrading APEXCORE dependency to version 3.6.0


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/8e44a9ca
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/8e44a9ca
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/8e44a9ca

Branch: refs/heads/master
Commit: 8e44a9cab014aa40e25d06ca324ef5fd70aec152
Parents: 2493bcb
Author: ajaygit158 <aj...@gmail.com>
Authored: Tue Apr 25 11:28:01 2017 +0530
Committer: ajaygit158 <aj...@gmail.com>
Committed: Thu May 25 11:31:45 2017 +0530

----------------------------------------------------------------------
 .../benchmark/WordCountOperator.java            |   2 +-
 .../HBaseTransactionalPutOperatorTest.java      | 131 ++-----------------
 library/pom.xml                                 |  31 +++++
 .../datatorrent/lib/math/RunningAverage.java    |   6 +-
 .../com/datatorrent/lib/stream/Counter.java     |   2 +-
 .../lib/io/fs/FileSplitterBaseTest.java         |   5 +-
 .../malhar/lib/dedup/DeduperOrderingTest.java   |   6 +-
 pom.xml                                         |   4 +-
 .../stream/api/impl/ApexStreamImplTest.java     |   8 +-
 9 files changed, 61 insertions(+), 134 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8e44a9ca/benchmark/src/main/java/com/datatorrent/benchmark/WordCountOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/WordCountOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/WordCountOperator.java
index 6e91482..8c55404 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/WordCountOperator.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/WordCountOperator.java
@@ -44,7 +44,7 @@ public class WordCountOperator<T> implements Operator
     @Override
     public void process(T tuple)
     {
-      count++;
+      WordCountOperator.this.count++;
     }
 
   };

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8e44a9ca/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseTransactionalPutOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseTransactionalPutOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseTransactionalPutOperatorTest.java
index 3cdc1bf..665cd40 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseTransactionalPutOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseTransactionalPutOperatorTest.java
@@ -19,19 +19,19 @@
 package com.datatorrent.contrib.hbase;
 
 import java.io.IOException;
-import java.util.Collection;
 
-import org.apache.hadoop.hbase.client.Put;
 import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.datatorrent.api.Attribute;
+import org.apache.hadoop.hbase.client.Put;
+
 import com.datatorrent.api.Attribute.AttributeMap;
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.Operator.ProcessingMode;
 
+import static com.datatorrent.lib.helper.OperatorContextTestHelper.mockOperatorContext;
 /**
  *
  */
@@ -55,44 +55,9 @@ public class HBaseTransactionalPutOperatorTest {
       t1.setColFamily("colfam0");t1.setColName("street");t1.setRow("row1");t1.setColValue("ts");
       HBaseTuple t2=new HBaseTuple();
       t2.setColFamily("colfam0");t2.setColName("city");t2.setRow("row2");t2.setColValue("tc");
-      thop.setup(new OperatorContext() {
-
-        @Override
-        public <T> T getValue(Attribute<T> key) {
-          if(key.equals(PROCESSING_MODE)){
-            return (T) ProcessingMode.AT_LEAST_ONCE;
-          }
-          return key.defaultValue;
-        }
-
-        @Override
-        public AttributeMap getAttributes() {
-          return null;
-        }
-
-        @Override
-        public int getId() {
-          // TODO Auto-generated method stub
-          return 0;
-        }
-
-        @Override
-        public void setCounters(Object counters) {
-          // TODO Auto-generated method stub
-
-        }
-
-        @Override
-        public void sendMetrics(Collection<String> collection)
-        {
-        }
-
-        @Override
-        public int getWindowsFromCheckpoint()
-        {
-          return 0;
-        }
-      });
+      AttributeMap.DefaultAttributeMap attributeMap = new AttributeMap.DefaultAttributeMap();
+      attributeMap.put(OperatorContext.PROCESSING_MODE, ProcessingMode.AT_LEAST_ONCE);
+      thop.setup(mockOperatorContext(0, attributeMap));
       thop.beginWindow(0);
       thop.input.process(t1);
       thop.input.process(t2);
@@ -125,44 +90,9 @@ public class HBaseTransactionalPutOperatorTest {
       t1.setColFamily("colfam0");t1.setColName("street");t1.setRow("row1");t1.setColValue("ts");
       HBaseTuple t2=new HBaseTuple();
       t2.setColFamily("colfam0");t2.setColName("city");t2.setRow("row2");t2.setColValue("tc");
-      thop.setup(new OperatorContext() {
-
-        @Override
-        public <T> T getValue(Attribute<T> key) {
-          if(key.equals(PROCESSING_MODE)){
-            return (T) ProcessingMode.AT_MOST_ONCE;
-          }
-          return key.defaultValue;
-        }
-
-        @Override
-        public AttributeMap getAttributes() {
-          return null;
-        }
-
-        @Override
-        public int getId() {
-          // TODO Auto-generated method stub
-          return 0;
-        }
-
-        @Override
-        public void setCounters(Object counters) {
-          // TODO Auto-generated method stub
-
-        }
-
-        @Override
-        public void sendMetrics(Collection<String> collection)
-        {
-        }
-
-        @Override
-        public int getWindowsFromCheckpoint()
-        {
-          return 0;
-        }
-      });
+      AttributeMap.DefaultAttributeMap attributeMap = new AttributeMap.DefaultAttributeMap();
+      attributeMap.put(OperatorContext.PROCESSING_MODE, ProcessingMode.AT_MOST_ONCE);
+      thop.setup(mockOperatorContext(0, attributeMap));
       thop.beginWindow(0);
       thop.input.process(t1);
       thop.input.process(t2);
@@ -199,46 +129,9 @@ public class HBaseTransactionalPutOperatorTest {
       t2.setColFamily("colfam0");t2.setColName("city");t2.setRow("row2");t2.setColValue("tc");
       thop.beginWindow(0);
       thop.input.process(t1);
-      thop.setup(new OperatorContext() {
-
-        @Override
-        public <T> T getValue(Attribute<T> key) {
-          if(key.equals(PROCESSING_MODE)){
-            return (T) ProcessingMode.AT_MOST_ONCE;
-          }
-          return key.defaultValue;
-        }
-
-        @Override
-        public AttributeMap getAttributes() {
-          return null;
-        }
-
-        @Override
-        public int getId() {
-          // TODO Auto-generated method stub
-          return 0;
-        }
-
-        @Override
-        public void setCounters(Object counters) {
-          // TODO Auto-generated method stub
-
-        }
-
-        @Override
-        public void sendMetrics(Collection<String> collection)
-        {
-        }
-
-        @Override
-        public int getWindowsFromCheckpoint()
-        {
-          return 0;
-        }
-      });
-
-
+      AttributeMap.DefaultAttributeMap attributeMap = new AttributeMap.DefaultAttributeMap();
+      attributeMap.put(OperatorContext.PROCESSING_MODE, ProcessingMode.AT_MOST_ONCE);
+      thop.setup(mockOperatorContext(0, attributeMap));
 
       thop.input.process(t2);
       thop.endWindow();

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8e44a9ca/library/pom.xml
----------------------------------------------------------------------
diff --git a/library/pom.xml b/library/pom.xml
index 609d537..17908dd 100644
--- a/library/pom.xml
+++ b/library/pom.xml
@@ -366,6 +366,37 @@
         </exclusion>
       </exclusions>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <version>${hadoop.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>*</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>javax.servlet</groupId>
+      <artifactId>javax.servlet-api</artifactId>
+      <version>3.1.0</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro</artifactId>
+      <version>1.7.4</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.datatorrent</groupId>
+      <artifactId>netlet</artifactId>
+      <version>1.3.1</version>
+      <scope>provided</scope>
+    </dependency>
   </dependencies>
 
   <profiles>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8e44a9ca/library/src/main/java/com/datatorrent/lib/math/RunningAverage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/RunningAverage.java b/library/src/main/java/com/datatorrent/lib/math/RunningAverage.java
index 163f06b..e3b0edf 100644
--- a/library/src/main/java/com/datatorrent/lib/math/RunningAverage.java
+++ b/library/src/main/java/com/datatorrent/lib/math/RunningAverage.java
@@ -64,9 +64,9 @@ public class RunningAverage extends BaseOperator
     @Override
     public void process(Number tuple)
     {
-      double sum = (count * average) + tuple.doubleValue();
-      count++;
-      average = sum / count;
+      double sum = (RunningAverage.this.count * average) + tuple.doubleValue();
+      RunningAverage.this.count++;
+      average = sum / RunningAverage.this.count;
     }
   };
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8e44a9ca/library/src/main/java/com/datatorrent/lib/stream/Counter.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/stream/Counter.java b/library/src/main/java/com/datatorrent/lib/stream/Counter.java
index 8de2653..d227988 100644
--- a/library/src/main/java/com/datatorrent/lib/stream/Counter.java
+++ b/library/src/main/java/com/datatorrent/lib/stream/Counter.java
@@ -50,7 +50,7 @@ public class Counter implements Operator, Unifier<Integer>
     @Override
     public void process(Object tuple)
     {
-      count++;
+      Counter.this.count++;
     }
 
   };

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8e44a9ca/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java
index c982ee4..9ec709e 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java
@@ -220,7 +220,6 @@ public class FileSplitterBaseTest
     @Override
     public void populateDAG(DAG dag, Configuration configuration)
     {
-      dag.setAttribute(DAG.APPLICATION_PATH, baseTestMeta.dataDirectory);
       MockFileInput fileInput = dag.addOperator("Input", new MockFileInput());
       fileInput.filePaths = baseTestMeta.filePaths;
 
@@ -246,8 +245,8 @@ public class FileSplitterBaseTest
       @Override
       public void process(FileSplitterInput.FileMetadata fileMetadata)
       {
-        count++;
-        LOG.debug("count {}", count);
+        MockReceiver.this.count++;
+        LOG.debug("count {}", MockReceiver.this.count);
       }
     };
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8e44a9ca/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperOrderingTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperOrderingTest.java b/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperOrderingTest.java
index b4f76a6..157f505 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperOrderingTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperOrderingTest.java
@@ -130,7 +130,7 @@ public class DeduperOrderingTest
         if (pojo.getSequence() < prevSequence) {
           testFailed = true;
         }
-        count++;
+        Verifier.this.count++;
         prevSequence = pojo.sequence;
       }
     };
@@ -144,7 +144,7 @@ public class DeduperOrderingTest
         if (pojo.getSequence() < prevSequence) {
           testFailed = true;
         }
-        count++;
+        Verifier.this.count++;
         prevSequence = pojo.sequence;
       }
     };
@@ -158,7 +158,7 @@ public class DeduperOrderingTest
         if (pojo.getSequence() < prevSequence) {
           testFailed = true;
         }
-        count++;
+        Verifier.this.count++;
         prevSequence = pojo.sequence;
       }
     };

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8e44a9ca/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index adc6de5..dc9ed8b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -25,7 +25,7 @@
   <parent>
     <groupId>org.apache.apex</groupId>
     <artifactId>apex</artifactId>
-    <version>3.4.0</version>
+    <version>3.6.0</version>
   </parent>
 
   <groupId>org.apache.apex</groupId>
@@ -49,7 +49,7 @@
   <properties>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     <maven.deploy.skip>false</maven.deploy.skip>
-    <apex.core.version>3.4.0</apex.core.version>
+    <apex.core.version>3.6.0</apex.core.version>
     <semver.plugin.skip>false</semver.plugin.skip>
     <surefire.args>-Xmx2048m</surefire.args>
   </properties>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8e44a9ca/stream/src/test/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImplTest.java
----------------------------------------------------------------------
diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImplTest.java b/stream/src/test/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImplTest.java
index 99d5ca6..5cc5d98 100644
--- a/stream/src/test/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImplTest.java
+++ b/stream/src/test/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImplTest.java
@@ -32,6 +32,7 @@ import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.stram.plan.logical.LogicalPlan;
+import com.datatorrent.stram.plan.logical.LogicalPlan.InputPortMeta;
 
 import static org.apache.apex.malhar.stream.api.Option.Options.name;
 
@@ -79,8 +80,11 @@ public class ApexStreamImplTest
 
       // Assert the stream is from first operator to second operator
       Assert.assertEquals("first", stream.getSource().getOperatorMeta().getName());
-      Assert.assertTrue(1 == stream.getSinks().size());
-      Assert.assertEquals("second", stream.getSinks().get(0).getOperatorWrapper().getName());
+      Collection<InputPortMeta> portMetaCollection = stream.getSinks();
+      Assert.assertTrue(1 == portMetaCollection.size());
+      for (InputPortMeta inputPortMeta : portMetaCollection) {
+        Assert.assertEquals("second", inputPortMeta.getOperatorMeta().getName());
+      }
 
       // Assert the stream is thread local
       Assert.assertTrue(stream.getLocality() == DAG.Locality.THREAD_LOCAL);