You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/01/29 20:12:38 UTC
[21/50] [abbrv] incubator-apex-malhar git commit: MLHR-1960 #comment
Clean resources that would conflict with tests to be run in before test
MLHR-1960 #comment Clean resources that would conflict with tests to be run in before test
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/32b1c67e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/32b1c67e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/32b1c67e
Branch: refs/heads/master
Commit: 32b1c67eeca155ba17b84a3d61827764babc605a
Parents: 216c56a
Author: Siyuan Hua <hs...@apache.org>
Authored: Mon Jan 4 17:06:56 2016 -0800
Committer: Siyuan Hua <hs...@apache.org>
Committed: Mon Jan 4 17:06:56 2016 -0800
----------------------------------------------------------------------
.../contrib/kafka/KafkaInputOperatorTest.java | 127 +++++++++++--------
.../contrib/kafka/KafkaOperatorTestBase.java | 10 +-
2 files changed, 83 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/32b1c67e/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java
index 2a5a38d..9db1355 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java
@@ -18,6 +18,26 @@
*/
package com.datatorrent.contrib.kafka;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+import org.slf4j.LoggerFactory;
+
import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
@@ -32,25 +52,8 @@ import com.datatorrent.lib.helper.OperatorContextTestHelper;
import com.datatorrent.lib.io.IdempotentStorageManager;
import com.datatorrent.lib.partitioner.StatelessPartitionerTest;
import com.datatorrent.lib.testbench.CollectorTestSink;
-import org.apache.commons.io.FileUtils;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestWatcher;
-import org.junit.runner.Description;
-import org.slf4j.LoggerFactory;
+import com.datatorrent.stram.StramLocalCluster;
-import java.io.File;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.hadoop.conf.Configuration;
public class KafkaInputOperatorTest extends KafkaOperatorTestBase
{
@@ -230,11 +233,20 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
}
@Override
- @After
- public void afterTest()
+ @Before
+ public void beforeTest()
{
tupleCount.set(0);
- super.afterTest();
+ File syncCheckPoint = new File("target", "ck");
+ File localFiles = new File("target" + StramLocalCluster.class.getName());
+ try {
+ FileUtils.deleteQuietly(syncCheckPoint);
+ FileUtils.deleteQuietly(localFiles);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ super.beforeTest();
+ }
}
public static class TestMeta extends TestWatcher
@@ -251,9 +263,9 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
String methodName = description.getMethodName();
String className = description.getClassName();
baseDir = "target/" + className + "/" + methodName;
- recoveryDir = baseDir + "/" + "recovery";
+ recoveryDir = "recovery";
try {
- FileUtils.deleteDirectory(new File(recoveryDir));
+ FileUtils.deleteDirectory(new File(baseDir, "recovery"));
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -276,10 +288,50 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
p.setSendCount(totalCount);
new Thread(p).start();
+
+ KafkaSinglePortStringInputOperator operator = createAndDeployOperator();
+ latch.await(4000, TimeUnit.MILLISECONDS);
+ operator.beginWindow(1);
+ operator.emitTuples();
+ operator.endWindow();
+ operator.beginWindow(2);
+ operator.emitTuples();
+ operator.endWindow();
+
+ //failure and then re-deployment of operator
+ testMeta.sink.collectedTuples.clear();
+ operator.teardown();
+ operator.deactivate();
+
+ operator = createAndDeployOperator();
+ Assert.assertEquals("largest recovery window", 2, operator.getIdempotentStorageManager().getLargestRecoveryWindow());
+
+ operator.beginWindow(1);
+ operator.emitTuples();
+ operator.endWindow();
+ operator.beginWindow(2);
+ operator.emitTuples();
+ operator.endWindow();
+ latch.await(3000, TimeUnit.MILLISECONDS);
+ // Emiting data after all recovery windows are replayed
+ operator.beginWindow(3);
+ operator.emitTuples();
+ operator.endWindow();
+
+ Assert.assertEquals("Total messages collected ", totalCount, testMeta.sink.collectedTuples.size());
+ testMeta.sink.collectedTuples.clear();
+ operator.teardown();
+ operator.deactivate();
+ }
+
+ private KafkaSinglePortStringInputOperator createAndDeployOperator()
+ {
+
Attribute.AttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
attributeMap.put(Context.OperatorContext.SPIN_MILLIS, 500);
attributeMap.put(Context.DAGContext.APPLICATION_PATH, testMeta.baseDir);
+
testMeta.context = new OperatorContextTestHelper.TestIdOperatorContext(1, attributeMap);
testMeta.operator = new KafkaSinglePortStringInputOperator();
@@ -304,37 +356,12 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
testMeta.sink = new CollectorTestSink<Object>();
testMeta.operator.outputPort.setSink(testMeta.sink);
operator.outputPort.setSink(testMeta.sink);
- operator.setup(testMeta.context);
- operator.activate(testMeta.context);
- latch.await(4000, TimeUnit.MILLISECONDS);
- operator.beginWindow(1);
- operator.emitTuples();
- operator.endWindow();
- operator.beginWindow(2);
- operator.emitTuples();
- operator.endWindow();
- //failure and then re-deployment of operator
- testMeta.sink.collectedTuples.clear();
- operator.teardown();
operator.setup(testMeta.context);
+ operator.activate(testMeta.context);
- Assert.assertEquals("largest recovery window", 2, operator.getIdempotentStorageManager().getLargestRecoveryWindow());
+ return operator;
- operator.beginWindow(1);
- operator.emitTuples();
- operator.endWindow();
- operator.beginWindow(2);
- operator.emitTuples();
- operator.endWindow();
- latch.await(3000, TimeUnit.MILLISECONDS);
- // Emiting data after all recovery windows are replayed
- operator.beginWindow(3);
- operator.emitTuples();
- operator.endWindow();
-
- Assert.assertEquals("Total messages collected ", totalCount, testMeta.sink.collectedTuples.size());
- testMeta.sink.collectedTuples.clear();
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/32b1c67e/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOperatorTestBase.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOperatorTestBase.java b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOperatorTestBase.java
index f4f5ef2..64651f4 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOperatorTestBase.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOperatorTestBase.java
@@ -26,8 +26,8 @@ import java.util.Properties;
import kafka.admin.TopicCommand;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
-import kafka.utils.Utils;
+import org.apache.commons.io.FileUtils;
import org.apache.zookeeper.server.NIOServerCnxnFactory;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.ZKDatabase;
@@ -71,7 +71,8 @@ public class KafkaOperatorTestBase
{
try {
-
+ //before start, clean the zookeeper files if it exists
+ FileUtils.deleteQuietly(new File(baseDir, zkBaseDir));
int clientPort = TEST_ZOOKEEPER_PORT[clusterId];
int numConnections = 10;
int tickTime = 2000;
@@ -96,11 +97,13 @@ public class KafkaOperatorTestBase
zkf.shutdown();
}
}
- Utils.rm(new File(baseDir, zkBaseDir));
}
public void startKafkaServer(int clusterid, int brokerid, int defaultPartitions)
{
+ // before start, clean the kafka dir if it exists
+ FileUtils.deleteQuietly(new File(baseDir, kafkaBaseDir));
+
Properties props = new Properties();
props.setProperty("broker.id", "" + brokerid);
props.setProperty("log.dirs", new File(baseDir, kafkadir[clusterid][brokerid]).toString());
@@ -149,7 +152,6 @@ public class KafkaOperatorTestBase
}
}
}
- Utils.rm(new File(baseDir, kafkaBaseDir));
}
@Before