You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by vr...@apache.org on 2017/05/25 20:43:57 UTC
apex-malhar git commit: APEXMALHAR-2471 Upgrading APEXCORE dependency
to version 3.6.0
Repository: apex-malhar
Updated Branches:
refs/heads/master 2493bcbf5 -> 8e44a9cab
APEXMALHAR-2471 Upgrading APEXCORE dependency to version 3.6.0
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/8e44a9ca
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/8e44a9ca
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/8e44a9ca
Branch: refs/heads/master
Commit: 8e44a9cab014aa40e25d06ca324ef5fd70aec152
Parents: 2493bcb
Author: ajaygit158 <aj...@gmail.com>
Authored: Tue Apr 25 11:28:01 2017 +0530
Committer: ajaygit158 <aj...@gmail.com>
Committed: Thu May 25 11:31:45 2017 +0530
----------------------------------------------------------------------
.../benchmark/WordCountOperator.java | 2 +-
.../HBaseTransactionalPutOperatorTest.java | 131 ++-----------------
library/pom.xml | 31 +++++
.../datatorrent/lib/math/RunningAverage.java | 6 +-
.../com/datatorrent/lib/stream/Counter.java | 2 +-
.../lib/io/fs/FileSplitterBaseTest.java | 5 +-
.../malhar/lib/dedup/DeduperOrderingTest.java | 6 +-
pom.xml | 4 +-
.../stream/api/impl/ApexStreamImplTest.java | 8 +-
9 files changed, 61 insertions(+), 134 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8e44a9ca/benchmark/src/main/java/com/datatorrent/benchmark/WordCountOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/WordCountOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/WordCountOperator.java
index 6e91482..8c55404 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/WordCountOperator.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/WordCountOperator.java
@@ -44,7 +44,7 @@ public class WordCountOperator<T> implements Operator
@Override
public void process(T tuple)
{
- count++;
+ WordCountOperator.this.count++;
}
};
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8e44a9ca/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseTransactionalPutOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseTransactionalPutOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseTransactionalPutOperatorTest.java
index 3cdc1bf..665cd40 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseTransactionalPutOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseTransactionalPutOperatorTest.java
@@ -19,19 +19,19 @@
package com.datatorrent.contrib.hbase;
import java.io.IOException;
-import java.util.Collection;
-import org.apache.hadoop.hbase.client.Put;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.datatorrent.api.Attribute;
+import org.apache.hadoop.hbase.client.Put;
+
import com.datatorrent.api.Attribute.AttributeMap;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.Operator.ProcessingMode;
+import static com.datatorrent.lib.helper.OperatorContextTestHelper.mockOperatorContext;
/**
*
*/
@@ -55,44 +55,9 @@ public class HBaseTransactionalPutOperatorTest {
t1.setColFamily("colfam0");t1.setColName("street");t1.setRow("row1");t1.setColValue("ts");
HBaseTuple t2=new HBaseTuple();
t2.setColFamily("colfam0");t2.setColName("city");t2.setRow("row2");t2.setColValue("tc");
- thop.setup(new OperatorContext() {
-
- @Override
- public <T> T getValue(Attribute<T> key) {
- if(key.equals(PROCESSING_MODE)){
- return (T) ProcessingMode.AT_LEAST_ONCE;
- }
- return key.defaultValue;
- }
-
- @Override
- public AttributeMap getAttributes() {
- return null;
- }
-
- @Override
- public int getId() {
- // TODO Auto-generated method stub
- return 0;
- }
-
- @Override
- public void setCounters(Object counters) {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void sendMetrics(Collection<String> collection)
- {
- }
-
- @Override
- public int getWindowsFromCheckpoint()
- {
- return 0;
- }
- });
+ AttributeMap.DefaultAttributeMap attributeMap = new AttributeMap.DefaultAttributeMap();
+ attributeMap.put(OperatorContext.PROCESSING_MODE, ProcessingMode.AT_LEAST_ONCE);
+ thop.setup(mockOperatorContext(0, attributeMap));
thop.beginWindow(0);
thop.input.process(t1);
thop.input.process(t2);
@@ -125,44 +90,9 @@ public class HBaseTransactionalPutOperatorTest {
t1.setColFamily("colfam0");t1.setColName("street");t1.setRow("row1");t1.setColValue("ts");
HBaseTuple t2=new HBaseTuple();
t2.setColFamily("colfam0");t2.setColName("city");t2.setRow("row2");t2.setColValue("tc");
- thop.setup(new OperatorContext() {
-
- @Override
- public <T> T getValue(Attribute<T> key) {
- if(key.equals(PROCESSING_MODE)){
- return (T) ProcessingMode.AT_MOST_ONCE;
- }
- return key.defaultValue;
- }
-
- @Override
- public AttributeMap getAttributes() {
- return null;
- }
-
- @Override
- public int getId() {
- // TODO Auto-generated method stub
- return 0;
- }
-
- @Override
- public void setCounters(Object counters) {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void sendMetrics(Collection<String> collection)
- {
- }
-
- @Override
- public int getWindowsFromCheckpoint()
- {
- return 0;
- }
- });
+ AttributeMap.DefaultAttributeMap attributeMap = new AttributeMap.DefaultAttributeMap();
+ attributeMap.put(OperatorContext.PROCESSING_MODE, ProcessingMode.AT_MOST_ONCE);
+ thop.setup(mockOperatorContext(0, attributeMap));
thop.beginWindow(0);
thop.input.process(t1);
thop.input.process(t2);
@@ -199,46 +129,9 @@ public class HBaseTransactionalPutOperatorTest {
t2.setColFamily("colfam0");t2.setColName("city");t2.setRow("row2");t2.setColValue("tc");
thop.beginWindow(0);
thop.input.process(t1);
- thop.setup(new OperatorContext() {
-
- @Override
- public <T> T getValue(Attribute<T> key) {
- if(key.equals(PROCESSING_MODE)){
- return (T) ProcessingMode.AT_MOST_ONCE;
- }
- return key.defaultValue;
- }
-
- @Override
- public AttributeMap getAttributes() {
- return null;
- }
-
- @Override
- public int getId() {
- // TODO Auto-generated method stub
- return 0;
- }
-
- @Override
- public void setCounters(Object counters) {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void sendMetrics(Collection<String> collection)
- {
- }
-
- @Override
- public int getWindowsFromCheckpoint()
- {
- return 0;
- }
- });
-
-
+ AttributeMap.DefaultAttributeMap attributeMap = new AttributeMap.DefaultAttributeMap();
+ attributeMap.put(OperatorContext.PROCESSING_MODE, ProcessingMode.AT_MOST_ONCE);
+ thop.setup(mockOperatorContext(0, attributeMap));
thop.input.process(t2);
thop.endWindow();
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8e44a9ca/library/pom.xml
----------------------------------------------------------------------
diff --git a/library/pom.xml b/library/pom.xml
index 609d537..17908dd 100644
--- a/library/pom.xml
+++ b/library/pom.xml
@@ -366,6 +366,37 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>javax.servlet</groupId>
+ <artifactId>javax.servlet-api</artifactId>
+ <version>3.1.0</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ <version>1.7.4</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.datatorrent</groupId>
+ <artifactId>netlet</artifactId>
+ <version>1.3.1</version>
+ <scope>provided</scope>
+ </dependency>
</dependencies>
<profiles>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8e44a9ca/library/src/main/java/com/datatorrent/lib/math/RunningAverage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/RunningAverage.java b/library/src/main/java/com/datatorrent/lib/math/RunningAverage.java
index 163f06b..e3b0edf 100644
--- a/library/src/main/java/com/datatorrent/lib/math/RunningAverage.java
+++ b/library/src/main/java/com/datatorrent/lib/math/RunningAverage.java
@@ -64,9 +64,9 @@ public class RunningAverage extends BaseOperator
@Override
public void process(Number tuple)
{
- double sum = (count * average) + tuple.doubleValue();
- count++;
- average = sum / count;
+ double sum = (RunningAverage.this.count * average) + tuple.doubleValue();
+ RunningAverage.this.count++;
+ average = sum / RunningAverage.this.count;
}
};
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8e44a9ca/library/src/main/java/com/datatorrent/lib/stream/Counter.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/stream/Counter.java b/library/src/main/java/com/datatorrent/lib/stream/Counter.java
index 8de2653..d227988 100644
--- a/library/src/main/java/com/datatorrent/lib/stream/Counter.java
+++ b/library/src/main/java/com/datatorrent/lib/stream/Counter.java
@@ -50,7 +50,7 @@ public class Counter implements Operator, Unifier<Integer>
@Override
public void process(Object tuple)
{
- count++;
+ Counter.this.count++;
}
};
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8e44a9ca/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java
index c982ee4..9ec709e 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java
@@ -220,7 +220,6 @@ public class FileSplitterBaseTest
@Override
public void populateDAG(DAG dag, Configuration configuration)
{
- dag.setAttribute(DAG.APPLICATION_PATH, baseTestMeta.dataDirectory);
MockFileInput fileInput = dag.addOperator("Input", new MockFileInput());
fileInput.filePaths = baseTestMeta.filePaths;
@@ -246,8 +245,8 @@ public class FileSplitterBaseTest
@Override
public void process(FileSplitterInput.FileMetadata fileMetadata)
{
- count++;
- LOG.debug("count {}", count);
+ MockReceiver.this.count++;
+ LOG.debug("count {}", MockReceiver.this.count);
}
};
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8e44a9ca/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperOrderingTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperOrderingTest.java b/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperOrderingTest.java
index b4f76a6..157f505 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperOrderingTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperOrderingTest.java
@@ -130,7 +130,7 @@ public class DeduperOrderingTest
if (pojo.getSequence() < prevSequence) {
testFailed = true;
}
- count++;
+ Verifier.this.count++;
prevSequence = pojo.sequence;
}
};
@@ -144,7 +144,7 @@ public class DeduperOrderingTest
if (pojo.getSequence() < prevSequence) {
testFailed = true;
}
- count++;
+ Verifier.this.count++;
prevSequence = pojo.sequence;
}
};
@@ -158,7 +158,7 @@ public class DeduperOrderingTest
if (pojo.getSequence() < prevSequence) {
testFailed = true;
}
- count++;
+ Verifier.this.count++;
prevSequence = pojo.sequence;
}
};
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8e44a9ca/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index adc6de5..dc9ed8b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -25,7 +25,7 @@
<parent>
<groupId>org.apache.apex</groupId>
<artifactId>apex</artifactId>
- <version>3.4.0</version>
+ <version>3.6.0</version>
</parent>
<groupId>org.apache.apex</groupId>
@@ -49,7 +49,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.deploy.skip>false</maven.deploy.skip>
- <apex.core.version>3.4.0</apex.core.version>
+ <apex.core.version>3.6.0</apex.core.version>
<semver.plugin.skip>false</semver.plugin.skip>
<surefire.args>-Xmx2048m</surefire.args>
</properties>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8e44a9ca/stream/src/test/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImplTest.java
----------------------------------------------------------------------
diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImplTest.java b/stream/src/test/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImplTest.java
index 99d5ca6..5cc5d98 100644
--- a/stream/src/test/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImplTest.java
+++ b/stream/src/test/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImplTest.java
@@ -32,6 +32,7 @@ import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.stram.plan.logical.LogicalPlan;
+import com.datatorrent.stram.plan.logical.LogicalPlan.InputPortMeta;
import static org.apache.apex.malhar.stream.api.Option.Options.name;
@@ -79,8 +80,11 @@ public class ApexStreamImplTest
// Assert the stream is from first operator to second operator
Assert.assertEquals("first", stream.getSource().getOperatorMeta().getName());
- Assert.assertTrue(1 == stream.getSinks().size());
- Assert.assertEquals("second", stream.getSinks().get(0).getOperatorWrapper().getName());
+ Collection<InputPortMeta> portMetaCollection = stream.getSinks();
+ Assert.assertTrue(1 == portMetaCollection.size());
+ for (InputPortMeta inputPortMeta : portMetaCollection) {
+ Assert.assertEquals("second", inputPortMeta.getOperatorMeta().getName());
+ }
// Assert the stream is thread local
Assert.assertTrue(stream.getLocality() == DAG.Locality.THREAD_LOCAL);