You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/01/29 20:12:38 UTC

[21/50] [abbrv] incubator-apex-malhar git commit: MLHR-1960 #comment Clean resources that would conflict with tests to be run in before test

MLHR-1960 #comment Clean resources that would conflict with tests to be run in before test


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/32b1c67e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/32b1c67e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/32b1c67e

Branch: refs/heads/master
Commit: 32b1c67eeca155ba17b84a3d61827764babc605a
Parents: 216c56a
Author: Siyuan Hua <hs...@apache.org>
Authored: Mon Jan 4 17:06:56 2016 -0800
Committer: Siyuan Hua <hs...@apache.org>
Committed: Mon Jan 4 17:06:56 2016 -0800

----------------------------------------------------------------------
 .../contrib/kafka/KafkaInputOperatorTest.java   | 127 +++++++++++--------
 .../contrib/kafka/KafkaOperatorTestBase.java    |  10 +-
 2 files changed, 83 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/32b1c67e/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java
index 2a5a38d..9db1355 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java
@@ -18,6 +18,26 @@
  */
 package com.datatorrent.contrib.kafka;
 
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+import org.slf4j.LoggerFactory;
+
 import com.datatorrent.api.Attribute;
 import com.datatorrent.api.Context;
 import com.datatorrent.api.DAG;
@@ -32,25 +52,8 @@ import com.datatorrent.lib.helper.OperatorContextTestHelper;
 import com.datatorrent.lib.io.IdempotentStorageManager;
 import com.datatorrent.lib.partitioner.StatelessPartitionerTest;
 import com.datatorrent.lib.testbench.CollectorTestSink;
-import org.apache.commons.io.FileUtils;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestWatcher;
-import org.junit.runner.Description;
-import org.slf4j.LoggerFactory;
+import com.datatorrent.stram.StramLocalCluster;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.hadoop.conf.Configuration;
 
 public class KafkaInputOperatorTest extends KafkaOperatorTestBase
 {
@@ -230,11 +233,20 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
   }
 
   @Override
-  @After
-  public void afterTest()
+  @Before
+  public void beforeTest()
   {
     tupleCount.set(0);
-    super.afterTest();
+    File syncCheckPoint = new File("target", "ck");
+    File localFiles = new File("target" + StramLocalCluster.class.getName());
+    try {
+      FileUtils.deleteQuietly(syncCheckPoint);
+      FileUtils.deleteQuietly(localFiles);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    } finally {
+      super.beforeTest();
+    }
   }
 
   public static class TestMeta extends TestWatcher
@@ -251,9 +263,9 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
       String methodName = description.getMethodName();
       String className = description.getClassName();
       baseDir = "target/" + className + "/" + methodName;
-      recoveryDir = baseDir + "/" + "recovery";
+      recoveryDir = "recovery";
       try {
-        FileUtils.deleteDirectory(new File(recoveryDir));
+        FileUtils.deleteDirectory(new File(baseDir, "recovery"));
       } catch (IOException e) {
         throw new RuntimeException(e);
       }
@@ -276,10 +288,50 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
     p.setSendCount(totalCount);
     new Thread(p).start();
 
+
+    KafkaSinglePortStringInputOperator operator = createAndDeployOperator();
+    latch.await(4000, TimeUnit.MILLISECONDS);
+    operator.beginWindow(1);
+    operator.emitTuples();
+    operator.endWindow();
+    operator.beginWindow(2);
+    operator.emitTuples();
+    operator.endWindow();
+
+    //failure and then re-deployment of operator
+    testMeta.sink.collectedTuples.clear();
+    operator.teardown();
+    operator.deactivate();
+
+    operator = createAndDeployOperator();
+    Assert.assertEquals("largest recovery window", 2, operator.getIdempotentStorageManager().getLargestRecoveryWindow());
+
+    operator.beginWindow(1);
+    operator.emitTuples();
+    operator.endWindow();
+    operator.beginWindow(2);
+    operator.emitTuples();
+    operator.endWindow();
+    latch.await(3000, TimeUnit.MILLISECONDS);
+    // Emiting data after all recovery windows are replayed
+    operator.beginWindow(3);
+    operator.emitTuples();
+    operator.endWindow();
+
+    Assert.assertEquals("Total messages collected ", totalCount, testMeta.sink.collectedTuples.size());
+    testMeta.sink.collectedTuples.clear();
+    operator.teardown();
+    operator.deactivate();
+  }
+
+  private KafkaSinglePortStringInputOperator createAndDeployOperator()
+  {
+
     Attribute.AttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
     attributeMap.put(Context.OperatorContext.SPIN_MILLIS, 500);
     attributeMap.put(Context.DAGContext.APPLICATION_PATH, testMeta.baseDir);
 
+
     testMeta.context = new OperatorContextTestHelper.TestIdOperatorContext(1, attributeMap);
     testMeta.operator = new KafkaSinglePortStringInputOperator();
 
@@ -304,37 +356,12 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
     testMeta.sink = new CollectorTestSink<Object>();
     testMeta.operator.outputPort.setSink(testMeta.sink);
     operator.outputPort.setSink(testMeta.sink);
-    operator.setup(testMeta.context);
-    operator.activate(testMeta.context);
-    latch.await(4000, TimeUnit.MILLISECONDS);
-    operator.beginWindow(1);
-    operator.emitTuples();
-    operator.endWindow();
-    operator.beginWindow(2);
-    operator.emitTuples();
-    operator.endWindow();
 
-    //failure and then re-deployment of operator
-    testMeta.sink.collectedTuples.clear();
-    operator.teardown();
     operator.setup(testMeta.context);
+    operator.activate(testMeta.context);
 
-    Assert.assertEquals("largest recovery window", 2, operator.getIdempotentStorageManager().getLargestRecoveryWindow());
+    return operator;
 
-    operator.beginWindow(1);
-    operator.emitTuples();
-    operator.endWindow();
-    operator.beginWindow(2);
-    operator.emitTuples();
-    operator.endWindow();
-    latch.await(3000, TimeUnit.MILLISECONDS);
-    // Emiting data after all recovery windows are replayed
-    operator.beginWindow(3);
-    operator.emitTuples();
-    operator.endWindow();
-
-    Assert.assertEquals("Total messages collected ", totalCount, testMeta.sink.collectedTuples.size());
-    testMeta.sink.collectedTuples.clear();
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/32b1c67e/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOperatorTestBase.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOperatorTestBase.java b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOperatorTestBase.java
index f4f5ef2..64651f4 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOperatorTestBase.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOperatorTestBase.java
@@ -26,8 +26,8 @@ import java.util.Properties;
 import kafka.admin.TopicCommand;
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaServerStartable;
-import kafka.utils.Utils;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.zookeeper.server.NIOServerCnxnFactory;
 import org.apache.zookeeper.server.ServerCnxnFactory;
 import org.apache.zookeeper.server.ZKDatabase;
@@ -71,7 +71,8 @@ public class KafkaOperatorTestBase
   {
 
     try {
-
+      //before start, clean the zookeeper files if it exists
+      FileUtils.deleteQuietly(new File(baseDir, zkBaseDir));
       int clientPort = TEST_ZOOKEEPER_PORT[clusterId];
       int numConnections = 10;
       int tickTime = 2000;
@@ -96,11 +97,13 @@ public class KafkaOperatorTestBase
         zkf.shutdown();
       }
     }
-    Utils.rm(new File(baseDir, zkBaseDir));
   }
 
   public void startKafkaServer(int clusterid, int brokerid, int defaultPartitions)
   {
+    // before start, clean the kafka dir if it exists
+    FileUtils.deleteQuietly(new File(baseDir, kafkaBaseDir));
+
     Properties props = new Properties();
     props.setProperty("broker.id", "" + brokerid);
     props.setProperty("log.dirs", new File(baseDir, kafkadir[clusterid][brokerid]).toString());
@@ -149,7 +152,6 @@ public class KafkaOperatorTestBase
         }
       }
     }
-    Utils.rm(new File(baseDir, kafkaBaseDir));
   }
 
   @Before