You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2017/04/17 08:43:49 UTC

apex-malhar git commit: APEXMALHAR-2460 Redshift output module unable to emit tuples.

Repository: apex-malhar
Updated Branches:
  refs/heads/master 176c5efe8 -> dd80369d4


APEXMALHAR-2460 Redshift output module unable to emit tuples.


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/dd80369d
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/dd80369d
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/dd80369d

Branch: refs/heads/master
Commit: dd80369d4177bd8ce31b16562fbcdca792ae1ac7
Parents: 176c5ef
Author: deepak-narkhede <ma...@gmail.com>
Authored: Wed Mar 29 17:10:44 2017 +0530
Committer: deepak-narkhede <ma...@gmail.com>
Committed: Thu Apr 13 11:22:22 2017 +0530

----------------------------------------------------------------------
 .../apex/malhar/lib/fs/s3/S3Reconciler.java     |  4 +-
 .../apex/malhar/lib/fs/s3/S3ReconcilerTest.java | 49 ++++++++++++++++++++
 2 files changed, 51 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dd80369d/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3Reconciler.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3Reconciler.java b/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3Reconciler.java
index 1e7b68c..dc76e91 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3Reconciler.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3Reconciler.java
@@ -174,11 +174,11 @@ public class S3Reconciler extends AbstractReconciler<FSRecordCompactionOperator.
     while (doneTuples.peek() != null) {
       FSRecordCompactionOperator.OutputMetaData metaData = doneTuples.poll();
       removeIntermediateFiles(metaData);
-      /*if (outputPort.isConnected()) {
+      if (outputPort.isConnected()) {
         // Emit the meta data with S3 path
         metaData.setPath(getDirectoryName() + Path.SEPARATOR + metaData.getFileName());
         outputPort.emit(metaData);
-      }*/
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dd80369d/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3ReconcilerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3ReconcilerTest.java b/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3ReconcilerTest.java
index 9023b5c..d138b53 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3ReconcilerTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3ReconcilerTest.java
@@ -42,6 +42,7 @@ import com.amazonaws.services.s3.model.PutObjectResult;
 import com.datatorrent.api.Attribute;
 import com.datatorrent.api.Context;
 import com.datatorrent.api.DAG;
+import com.datatorrent.lib.testbench.CollectorTestSink;
 
 import static com.datatorrent.lib.helper.OperatorContextTestHelper.mockOperatorContext;
 import static org.mockito.Matchers.any;
@@ -54,6 +55,7 @@ public class S3ReconcilerTest
   {
     S3Reconciler underTest;
     Context.OperatorContext context;
+    CollectorTestSink<Object> sink;
 
     @Mock
     AmazonS3 s3clientMock;
@@ -102,6 +104,53 @@ public class S3ReconcilerTest
   public TestMeta testMeta = new TestMeta();
 
   @Test
+  public void verifyS3ReconclierOutputTuple() throws Exception
+  {
+    String fileName = "s3-compaction_1.0";
+    String path = testMeta.outputPath + Path.SEPARATOR + fileName;
+    long size = 80;
+
+    File file = new File(path);
+    File tmpFile = new File(path + "." + System.currentTimeMillis() + ".tmp");
+    StringBuffer sb = new StringBuffer();
+    for (int i = 0; i < 10; i++) {
+      sb.append("Record" + i + "\n");
+      if (i == 5) {
+        FileUtils.write(tmpFile, sb.toString());
+      }
+    }
+    FileUtils.write(file, sb.toString());
+
+    // Set test sink and later on collect the emitted tuples in this sink.
+    testMeta.sink = new CollectorTestSink<Object>();
+    testMeta.underTest.outputPort.setSink(testMeta.sink);
+
+    // Create meta information to be emitted as tuple.
+    FSRecordCompactionOperator.OutputMetaData outputMetaData = new FSRecordCompactionOperator.OutputMetaData(path, fileName, size);
+    testMeta.underTest.beginWindow(0);
+    testMeta.underTest.input.process(outputMetaData);
+    testMeta.underTest.endWindow();
+
+    for (int i = 1; i < 60; i++) {
+      testMeta.underTest.beginWindow(i);
+      testMeta.underTest.endWindow();
+    }
+    testMeta.underTest.committed(59);
+
+    // retrieve the result count from output port.
+    testMeta.sink.waitForResultCount(1, 12000);
+
+    for (int i = 60; i < 70; i++) {
+      testMeta.underTest.beginWindow(i);
+      Thread.sleep(10);
+      testMeta.underTest.endWindow();
+    }
+
+    // verify the number of tuples emitted.
+    Assert.assertEquals(1, testMeta.sink.getCount(false));
+  }
+
+  @Test
   public void testFileClearing() throws Exception
   {
     String fileName = "s3-compaction_1.0";