You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/07/02 19:04:56 UTC
[3/5] apex-malhar git commit: APEXMALHAR-2459 1)Refactor the existing
Kafka Input Operator. 2)Added the support of KafkaInputOperator using 0.10
consumer API
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/kafka09/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortInputOperator.java
----------------------------------------------------------------------
diff --git a/kafka/kafka09/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortInputOperator.java b/kafka/kafka09/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortInputOperator.java
new file mode 100644
index 0000000..5b54979
--- /dev/null
+++ b/kafka/kafka09/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortInputOperator.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kafka;
+
+import java.util.Properties;
+
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import com.datatorrent.api.DefaultOutputPort;
+
+/**
+ * This is just an example of single port operator emits only byte array messages
+ * The key and cluster information are ignored
+ * This class emit the value to the single output port
+ *
+ * @since 3.3.0
+ */
+@InterfaceStability.Evolving
+public class KafkaSinglePortInputOperator extends AbstractKafkaInputOperator
+{
+ /**
+ * This output port emits tuples extracted from Kafka messages.
+ */
+ public final transient DefaultOutputPort<byte[]> outputPort = new DefaultOutputPort<>();
+
+ /**
+ * Create the Kafka 0.9 Consumer
+ * @param properties properties
+ * @return consumer
+ */
+ @Override
+ public AbstractKafkaConsumer createConsumer(Properties properties)
+ {
+ return new KafkaConsumer09(properties);
+ }
+
+ @Override
+ protected void emitTuple(String cluster, ConsumerRecord<byte[], byte[]> message)
+ {
+ outputPort.emit(message.value());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/kafka09/src/test/java/org/apache/apex/malhar/kafka/EmbeddedKafka.java
----------------------------------------------------------------------
diff --git a/kafka/kafka09/src/test/java/org/apache/apex/malhar/kafka/EmbeddedKafka.java b/kafka/kafka09/src/test/java/org/apache/apex/malhar/kafka/EmbeddedKafka.java
new file mode 100644
index 0000000..e9fcc36
--- /dev/null
+++ b/kafka/kafka09/src/test/java/org/apache/apex/malhar/kafka/EmbeddedKafka.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kafka;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import org.I0Itec.zkclient.ZkClient;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import com.google.common.base.Throwables;
+
+import kafka.admin.AdminUtils;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.MockTime;
+import kafka.utils.TestUtils;
+import kafka.utils.Time;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+import kafka.zk.EmbeddedZookeeper;
+
+public class EmbeddedKafka
+{
+ private static final String KAFKA_PATH = "/tmp/kafka-test";
+
+ private ZkClient zkClient;
+ private ZkUtils zkUtils;
+ private String BROKERHOST = "127.0.0.1";
+ private String BROKERPORT = "9092";
+ private EmbeddedZookeeper zkServer;
+ private KafkaServer kafkaServer;
+
+ public String getBroker()
+ {
+ return BROKERHOST + ":" + BROKERPORT;
+ }
+
+ public void start() throws IOException
+ {
+ // Find port
+ try {
+ ServerSocket serverSocket = new ServerSocket(0);
+ BROKERPORT = Integer.toString(serverSocket.getLocalPort());
+ serverSocket.close();
+ } catch (IOException e) {
+ throw Throwables.propagate(e);
+ }
+
+ // Setup Zookeeper
+ zkServer = new EmbeddedZookeeper();
+ String zkConnect = BROKERHOST + ":" + zkServer.port();
+ zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$);
+ zkUtils = ZkUtils.apply(zkClient, false);
+
+ // Setup brokers
+ cleanupDir();
+ Properties props = new Properties();
+ props.setProperty("zookeeper.connect", zkConnect);
+ props.setProperty("broker.id", "0");
+ props.setProperty("log.dirs", KAFKA_PATH);
+ props.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" + BROKERPORT);
+ KafkaConfig config = new KafkaConfig(props);
+ Time mock = new MockTime();
+ kafkaServer = TestUtils.createServer(config, mock);
+ }
+
+ public void stop() throws IOException
+ {
+ kafkaServer.shutdown();
+ zkClient.close();
+ zkServer.shutdown();
+ cleanupDir();
+ }
+
+ private void cleanupDir() throws IOException
+ {
+ FileUtils.deleteDirectory(new File(KAFKA_PATH));
+ }
+
+ public void createTopic(String topic)
+ {
+ AdminUtils.createTopic(zkUtils, topic, 1, 1, new Properties());
+ List<KafkaServer> servers = new ArrayList<KafkaServer>();
+ servers.add(kafkaServer);
+ TestUtils.waitUntilMetadataIsPropagated(scala.collection.JavaConversions.asScalaBuffer(servers), topic, 0, 30000);
+ }
+
+ public void publish(String topic, List<String> messages)
+ {
+ Properties producerProps = new Properties();
+ producerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT);
+ producerProps.setProperty("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
+ producerProps.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
+
+ try (KafkaProducer<Integer, byte[]> producer = new KafkaProducer<>(producerProps)) {
+ for (String message : messages) {
+ ProducerRecord<Integer, byte[]> data = new ProducerRecord<>(topic, message.getBytes(StandardCharsets.UTF_8));
+ producer.send(data);
+ }
+ }
+
+ List<KafkaServer> servers = new ArrayList<KafkaServer>();
+ servers.add(kafkaServer);
+ TestUtils.waitUntilMetadataIsPropagated(scala.collection.JavaConversions.asScalaBuffer(servers), topic, 0, 30000);
+ }
+
+ public List<String> consume(String topic, int timeout)
+ {
+ return consume(topic, timeout, true);
+ }
+
+ public List<String> consume(String topic, int timeout, boolean earliest)
+ {
+ Properties consumerProps = new Properties();
+ consumerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT);
+ consumerProps.setProperty("group.id", "group0");
+ consumerProps.setProperty("client.id", "consumer0");
+ consumerProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
+ consumerProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+ // to make sure the consumer starts from the beginning of the topic
+ consumerProps.put("auto.offset.reset", earliest ? "earliest" : "latest");
+ KafkaConsumer<Integer, byte[]> consumer = new KafkaConsumer<>(consumerProps);
+ consumer.subscribe(Arrays.asList(topic));
+
+ List<String> messages = new ArrayList<>();
+
+ ConsumerRecords<Integer, byte[]> records = consumer.poll(timeout);
+ for (ConsumerRecord<Integer, byte[]> record : records) {
+ messages.add(new String(record.value()));
+ }
+
+ consumer.close();
+
+ return messages;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/kafka09/src/test/java/org/apache/apex/malhar/kafka/KafkaConsumerPropertiesTest.java
----------------------------------------------------------------------
diff --git a/kafka/kafka09/src/test/java/org/apache/apex/malhar/kafka/KafkaConsumerPropertiesTest.java b/kafka/kafka09/src/test/java/org/apache/apex/malhar/kafka/KafkaConsumerPropertiesTest.java
new file mode 100644
index 0000000..75483e9
--- /dev/null
+++ b/kafka/kafka09/src/test/java/org/apache/apex/malhar/kafka/KafkaConsumerPropertiesTest.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kafka;
+
+public class KafkaConsumerPropertiesTest extends AbstractKafkaConsumerPropertiesTest
+{
+ @Override
+ public AbstractKafkaInputOperator createKafkaInputOperator()
+ {
+ return new KafkaSinglePortInputOperator();
+ }
+
+ @Override
+ public String expectedException()
+ {
+ return new String("java.lang.IllegalArgumentException: You must pass java.security.auth.login.config in "
+ + "secure mode.");
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/kafka09/src/test/java/org/apache/apex/malhar/kafka/KafkaHelper.java
----------------------------------------------------------------------
diff --git a/kafka/kafka09/src/test/java/org/apache/apex/malhar/kafka/KafkaHelper.java b/kafka/kafka09/src/test/java/org/apache/apex/malhar/kafka/KafkaHelper.java
new file mode 100644
index 0000000..abf3a5b
--- /dev/null
+++ b/kafka/kafka09/src/test/java/org/apache/apex/malhar/kafka/KafkaHelper.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.kafka;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+
+public class KafkaHelper implements Serializer<KafkaOutputOperatorTest.Person>,
+ Deserializer<KafkaOutputOperatorTest.Person>
+{
+ @Override
+ public KafkaOutputOperatorTest.Person deserialize(String s, byte[] bytes)
+ {
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+ int nameLength = byteBuffer.getInt();
+ byte[] name = new byte[nameLength];
+
+ byteBuffer.get(name, 0, nameLength);
+
+ return new KafkaOutputOperatorTest.Person(new String(name), byteBuffer.getInt());
+ }
+
+ @Override
+ public byte[] serialize(String s, KafkaOutputOperatorTest.Person person)
+ {
+ byte[] name = person.name.getBytes();
+
+ ByteBuffer byteBuffer = ByteBuffer.allocate(name.length + 4 + 4);
+
+ byteBuffer.putInt(name.length);
+ byteBuffer.put(name);
+ byteBuffer.putInt(person.age);
+
+ return byteBuffer.array();
+ }
+
+ @Override
+ public void configure(Map<String, ?> map, boolean b)
+ {
+ }
+
+ @Override
+ public void close()
+ {
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/kafka09/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/kafka/kafka09/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java b/kafka/kafka09/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
new file mode 100644
index 0000000..09d878d
--- /dev/null
+++ b/kafka/kafka09/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
@@ -0,0 +1,397 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kafka;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.stram.StramLocalCluster;
+
+/**
+ * A bunch of test to verify the input operator will be automatically partitioned
+ * per kafka partition This test is launching its
+ * own Kafka cluster.
+ */
+@RunWith(Parameterized.class)
+public class KafkaInputOperatorTest extends KafkaOperatorTestBase
+{
+
+ private int totalBrokers = 0;
+
+ private String partition = null;
+
+ private String testName = "";
+
+ public static String APPLICATION_PATH = baseDir + File.separator + StramLocalCluster.class.getName() + File.separator;
+
+ public class KafkaTestInfo extends TestWatcher
+ {
+ public org.junit.runner.Description desc;
+
+ public String getDir()
+ {
+ String methodName = desc.getMethodName();
+ String className = desc.getClassName();
+ return "target/" + className + "/" + methodName + "/" + testName;
+ }
+
+ @Override
+ protected void starting(org.junit.runner.Description description)
+ {
+ this.desc = description;
+ }
+ }
+
+ @Rule
+ public final KafkaTestInfo testInfo = new KafkaTestInfo();
+
+ @Parameterized.Parameters(name = "multi-cluster: {0}, multi-partition: {1}, partition: {2}")
+ public static Collection<Object[]> testScenario()
+ {
+ return Arrays.asList(new Object[][]{
+ {true, false, "one_to_one"},// multi cluster with single partition
+ {true, false, "one_to_many"},
+ {true, true, "one_to_one"},// multi cluster with multi partitions
+ {true, true, "one_to_many"},
+ {false, true, "one_to_one"}, // single cluster with multi partitions
+ {false, true, "one_to_many"},
+ {false, false, "one_to_one"}, // single cluster with single partitions
+ {false, false, "one_to_many"}
+ });
+ }
+
+ @Before
+ public void before()
+ {
+ testName = TEST_TOPIC + testCounter++;
+ logger.info("before() test case: {}", testName);
+ tupleCollection.clear();
+ //reset count for next new test case
+ k = 0;
+
+ createTopic(0, testName);
+ if (hasMultiCluster) {
+ createTopic(1, testName);
+ }
+
+ }
+
+ public KafkaInputOperatorTest(boolean hasMultiCluster, boolean hasMultiPartition, String partition)
+ {
+ // This class want to initialize several kafka brokers for multiple partitions
+ this.hasMultiCluster = hasMultiCluster;
+ this.hasMultiPartition = hasMultiPartition;
+ int cluster = 1 + (hasMultiCluster ? 1 : 0);
+ totalBrokers = (1 + (hasMultiPartition ? 1 : 0)) * cluster;
+ this.partition = partition;
+ }
+
+ private static final org.slf4j.Logger logger = LoggerFactory.getLogger(KafkaInputOperatorTest.class);
+ private static List<String> tupleCollection = new LinkedList<>();
+
+ /**
+ * whether countDown latch count all tuples or just END_TUPLE
+ */
+ private static final boolean countDownAll = false;
+ private static final int scale = 2;
+ private static final int totalCount = 10 * scale;
+ private static final int failureTrigger = 3 * scale;
+ private static final int tuplesPerWindow = 5 * scale;
+ private static final int waitTime = 60000 + 300 * scale;
+
+ //This latch was used to count the END_TUPLE, but the order of tuple can't be guaranteed,
+ //so, count valid tuple instead.
+ private static CountDownLatch latch;
+ private static boolean hasFailure = false;
+ private static int k = 0;
+ private static Thread monitorThread;
+
+ /**
+ * Test Operator to collect tuples from KafkaSingleInputStringOperator.
+ *
+ * @param
+ */
+ public static class CollectorModule extends BaseOperator
+ {
+ public final transient DefaultInputPort<byte[]> inputPort = new DefaultInputPort<byte[]>()
+ {
+ @Override
+ public void process(byte[] bt)
+ {
+ processTuple(bt);
+ }
+ };
+
+ long currentWindowId;
+
+ long operatorId;
+
+ boolean isIdempotentTest = false;
+
+ transient List<String> windowTupleCollector = Lists.newArrayList();
+ private transient Map<String, List<String>> tupleCollectedInWindow = new HashMap<>();
+ private int endTuples = 0;
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ super.setup(context);
+ operatorId = context.getId();
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ super.beginWindow(windowId);
+ currentWindowId = windowId;
+ windowTupleCollector.clear();
+ endTuples = 0;
+ }
+
+ public void processTuple(byte[] bt)
+ {
+ String tuple = new String(bt);
+ if (hasFailure && k++ == failureTrigger) {
+ //you can only kill yourself once
+ hasFailure = false;
+ throw new RuntimeException();
+ }
+ if (tuple.startsWith(KafkaOperatorTestBase.END_TUPLE)) {
+ endTuples++;
+ }
+
+ windowTupleCollector.add(tuple);
+ }
+
+ @Override
+ public void endWindow()
+ {
+ super.endWindow();
+ if (isIdempotentTest) {
+ String key = operatorId + "," + currentWindowId;
+ List<String> msgsInWin = tupleCollectedInWindow.get(key);
+ if (msgsInWin != null) {
+ Assert.assertEquals(
+ "replay messages should be exactly same as previous window", msgsInWin, windowTupleCollector);
+ } else {
+ List<String> newList = Lists.newArrayList();
+ newList.addAll(windowTupleCollector);
+ tupleCollectedInWindow.put(key, newList);
+ }
+ }
+
+ //discard the tuples of this window if except happened
+ int tupleSize = windowTupleCollector.size();
+ tupleCollection.addAll(windowTupleCollector);
+
+ int countDownTupleSize = countDownAll ? tupleSize : endTuples;
+
+ if (latch != null) {
+ Assert.assertTrue(
+ "received END_TUPLES more than expected.", latch.getCount() >= countDownTupleSize);
+ while (countDownTupleSize > 0) {
+ latch.countDown();
+ --countDownTupleSize;
+ }
+ if (latch.getCount() == 0) {
+ /**
+ * The time before countDown() and the shutdown() of the application
+ * will cause fatal error:
+ * "Catastrophic Error: Invalid State - the operator blocked forever!"
+ * as the activeQueues could be cleared but alive haven't changed yet.
+ * throw the ShutdownException to let the engine shutdown;
+ */
+ try {
+ throw new ShutdownException();
+ //lc.shutdown();
+ } finally {
+ /**
+ * interrupt the engine thread, let it wake from sleep and handle
+ * the shutdown at this time, all payload should be handled. so it
+ * should be ok to interrupt
+ */
+ monitorThread.interrupt();
+ }
+ }
+ }
+ }
+
+ }
+
+ /**
+ * Test AbstractKafkaSinglePortInputOperator (i.e. an input adapter for Kafka, aka consumer). This module receives
+ * data from an outside test generator through Kafka message bus and feed that data into Malhar streaming platform.
+ *
+ * [Generate message and send that to Kafka message bus] ==> [Receive that message through Kafka input adapter(i.e.
+ * consumer) and send using emitTuples() interface on output port]
+ *
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testInputOperator() throws Exception
+ {
+ hasFailure = false;
+ testInputOperator(false, false);
+ }
+
+ @Test
+ public void testInputOperatorWithFailure() throws Exception
+ {
+ hasFailure = true;
+ testInputOperator(true, false);
+ }
+
+ @Test
+ public void testIdempotentInputOperatorWithFailure() throws Exception
+ {
+ hasFailure = true;
+ testInputOperator(true, true);
+ }
+
+ public void testInputOperator(boolean hasFailure, boolean idempotent) throws Exception
+ {
+ // each broker should get a END_TUPLE message
+ latch = new CountDownLatch(countDownAll ? totalCount + totalBrokers : totalBrokers);
+
+ logger.info(
+ "Test Case: name: {}; totalBrokers: {}; hasFailure: {}; hasMultiCluster: {};" +
+ " hasMultiPartition: {}, partition: {}",
+ testName, totalBrokers, hasFailure, hasMultiCluster, hasMultiPartition, partition);
+
+ // Start producer
+ KafkaTestProducer p = new KafkaTestProducer(testName, hasMultiPartition, hasMultiCluster);
+ p.setSendCount(totalCount);
+ Thread t = new Thread(p);
+ t.start();
+
+ int expectedReceiveCount = totalCount + totalBrokers;
+
+ // Create DAG for testing.
+ LocalMode lma = LocalMode.newInstance();
+ DAG dag = lma.getDAG();
+
+ // Create KafkaSinglePortStringInputOperator
+ KafkaSinglePortInputOperator node = dag.addOperator(
+ "Kafka input" + testName, KafkaSinglePortInputOperator.class);
+ node.setInitialPartitionCount(1);
+ // set topic
+ node.setTopics(testName);
+ node.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name());
+ node.setClusters(getClusterConfig());
+ node.setStrategy(partition);
+ if (idempotent) {
+ node.setWindowDataManager(new FSWindowDataManager());
+ }
+
+ // Create Test tuple collector
+ CollectorModule collector = dag.addOperator("TestMessageCollector", CollectorModule.class);
+ collector.isIdempotentTest = idempotent;
+
+ // Connect ports
+ dag.addStream("Kafka message" + testName, node.outputPort, collector.inputPort)
+ .setLocality(Locality.CONTAINER_LOCAL);
+
+ if (hasFailure) {
+ setupHasFailureTest(node, dag);
+ }
+
+ // Create local cluster
+ LocalMode.Controller lc = lma.getController();
+ lc.setHeartbeatMonitoringEnabled(false);
+
+ //let the Controller to run the inside another thread. It is almost same as call Controller.runAsync(),
+ //but Controller.runAsync() don't expose the thread which run it,
+ //so we don't know when the thread will be terminated.
+ //create this thread and then call join() to make sure the Controller shutdown completely.
+ monitorThread = new Thread((StramLocalCluster)lc, "master");
+ monitorThread.start();
+
+ boolean notTimeout = true;
+ try {
+ // Wait 60s for consumer finish consuming all the messages
+ notTimeout = latch.await(waitTime, TimeUnit.MILLISECONDS);
+ lc.shutdown();
+
+ //wait until control thread finished.
+ monitorThread.join();
+ } catch (Exception e) {
+ logger.warn(e.getMessage());
+ }
+
+ t.join();
+
+ if (!notTimeout || expectedReceiveCount != tupleCollection.size()) {
+ logger.info("Number of received/expected tuples: {}/{}, testName: {}, tuples: \n{}", tupleCollection.size(),
+ expectedReceiveCount, testName, tupleCollection);
+ }
+ Assert.assertTrue("TIMEOUT. testName: " + this.testName + "; Collected data: "
+ + tupleCollection, notTimeout);
+
+ // Check results
+ Assert.assertTrue("testName: " + testName + "; Collected tuple size: " + tupleCollection.size()
+ + "; Expected tuple size: " + expectedReceiveCount + "; data: \n" + tupleCollection,
+ expectedReceiveCount == tupleCollection.size());
+
+ logger.info("End of test case: {}", testName);
+ }
+
+ private void setupHasFailureTest(KafkaSinglePortInputOperator operator, DAG dag)
+ {
+ operator.setHoldingBufferSize(5000);
+ dag.setAttribute(Context.DAGContext.CHECKPOINT_WINDOW_COUNT, 1);
+ //dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new FSStorageAgent(
+ // APPLICATION_PATH + "failureck", new Configuration()));
+ operator.setMaxTuplesPerWindow(tuplesPerWindow);
+ }
+
+ private String getClusterConfig()
+ {
+ String l = "localhost:";
+ return l + TEST_KAFKA_BROKER_PORT[0] +
+ (hasMultiCluster ? ";" + l + TEST_KAFKA_BROKER_PORT[1] : "");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/kafka09/src/test/java/org/apache/apex/malhar/kafka/KafkaOperatorTestBase.java
----------------------------------------------------------------------
diff --git a/kafka/kafka09/src/test/java/org/apache/apex/malhar/kafka/KafkaOperatorTestBase.java b/kafka/kafka09/src/test/java/org/apache/apex/malhar/kafka/KafkaOperatorTestBase.java
new file mode 100644
index 0000000..235eeba
--- /dev/null
+++ b/kafka/kafka09/src/test/java/org/apache/apex/malhar/kafka/KafkaOperatorTestBase.java
@@ -0,0 +1,285 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kafka;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.util.Properties;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.ZKDatabase;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+
+import kafka.admin.TopicCommand;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServerStartable;
+import kafka.utils.ZkUtils;
+
+/**
+ * This is a base class setup/clean Kafka testing environment for all the input/output test If it's a multipartition
+ * test, this class creates 2 kafka partitions
+ */
+public class KafkaOperatorTestBase
+{
+
+ public static final String END_TUPLE = "END_TUPLE";
+ public static final int[] TEST_ZOOKEEPER_PORT;
+ public static final int[] TEST_KAFKA_BROKER_PORT;
+ public static final String TEST_TOPIC = "testtopic";
+ public static int testCounter = 0;
+
+ // get available ports
+ static {
+ ServerSocket[] listeners = new ServerSocket[6];
+ int[] p = new int[6];
+
+ try {
+ for (int i = 0; i < 6; i++) {
+ listeners[i] = new ServerSocket(0);
+ p[i] = listeners[i].getLocalPort();
+ }
+
+ for (int i = 0; i < 6; i++) {
+ listeners[i].close();
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ TEST_ZOOKEEPER_PORT = new int[]{p[0], p[1]};
+ TEST_KAFKA_BROKER_PORT = new int[]{p[2], p[3]};
+ }
+
+ static final org.slf4j.Logger logger = LoggerFactory.getLogger(KafkaOperatorTestBase.class);
+ // since Kafka 0.8 use KafkaServerStatble instead of KafkaServer
+
+ // multiple brokers in multiple cluster
+ private static KafkaServerStartable[] broker = new KafkaServerStartable[2];
+
+ // multiple cluster
+ private static ServerCnxnFactory[] zkFactory = new ServerCnxnFactory[2];
+
+ private static ZooKeeperServer[] zkServer = new ZooKeeperServer[2];
+
+ public static String baseDir = "target";
+
+ private static final String zkBaseDir = "zookeeper-server-data";
+ private static final String kafkaBaseDir = "kafka-server-data";
+ private static final String[] zkdir = new String[]{"zookeeper-server-data/1", "zookeeper-server-data/2"};
+ private static final String[] kafkadir = new String[]{"kafka-server-data/1/1", "kafka-server-data/1/2"};
+ protected boolean hasMultiPartition = false;
+ protected boolean hasMultiCluster = false;
+
+ public static void startZookeeper(final int clusterId)
+ {
+ try {
+
+ int numConnections = 100;
+ int tickTime = 2000;
+ File dir = new File(baseDir, zkdir[clusterId]);
+
+ zkServer[clusterId] = new TestZookeeperServer(dir, dir, tickTime);
+ zkFactory[clusterId] = new NIOServerCnxnFactory();
+ zkFactory[clusterId].configure(new InetSocketAddress(TEST_ZOOKEEPER_PORT[clusterId]), numConnections);
+
+ zkFactory[clusterId].startup(zkServer[clusterId]); // start the zookeeper server.
+ Thread.sleep(2000);
+ //kserver.startup();
+ } catch (Exception ex) {
+ logger.error(ex.getLocalizedMessage());
+ }
+ }
+
+ public static void stopZookeeper()
+ {
+ for (ZooKeeperServer zs : zkServer) {
+ if (zs != null) {
+ zs.shutdown();
+ }
+ }
+
+ for (ServerCnxnFactory zkf : zkFactory) {
+ if (zkf != null) {
+ zkf.closeAll();
+ zkf.shutdown();
+ }
+ }
+ zkServer = new ZooKeeperServer[2];
+ zkFactory = new ServerCnxnFactory[2];
+ }
+
+ public static void startKafkaServer(int clusterid, int brokerid)
+ {
+ Properties props = new Properties();
+ props.setProperty("broker.id", "" + clusterid * 10 + brokerid);
+ props.setProperty("log.dirs", new File(baseDir, kafkadir[clusterid]).toString());
+ props.setProperty("zookeeper.connect", "localhost:" + TEST_ZOOKEEPER_PORT[clusterid]);
+ props.setProperty("port", "" + TEST_KAFKA_BROKER_PORT[clusterid]);
+ props.setProperty("default.replication.factor", "1");
+ // set this to 50000 to boost the performance so most test data are in memory before flush to disk
+ props.setProperty("log.flush.interval.messages", "50000");
+
+ broker[clusterid] = new KafkaServerStartable(new KafkaConfig(props));
+ broker[clusterid].startup();
+
+ }
+
+ public static void startKafkaServer()
+ {
+
+ FileUtils.deleteQuietly(new File(baseDir, kafkaBaseDir));
+ //boolean[][] startable = new boolean[][] { new boolean[] { true, hasMultiPartition },
+ // new boolean[] { hasMultiCluster, hasMultiCluster && hasMultiPartition } };
+ startKafkaServer(0, 0);
+ //startKafkaServer(0, 1);
+ startKafkaServer(1, 0);
+ //startKafkaServer(1, 1);
+
+ // startup is asynch operation. wait 2 sec for server to startup
+
+ }
+
+ public static void stopKafkaServer()
+ {
+ for (int i = 0; i < broker.length; i++) {
+ if (broker[i] != null) {
+ broker[i].shutdown();
+ broker[i].awaitShutdown();
+ broker[i] = null;
+ }
+ }
+ }
+
+ @BeforeClass
+ public static void beforeTest()
+ {
+ try {
+ startZookeeper();
+ startKafkaServer();
+ } catch (java.nio.channels.CancelledKeyException ex) {
+ logger.debug("LSHIL {}", ex.getLocalizedMessage());
+ }
+ }
+
+ public static void startZookeeper()
+ {
+ FileUtils.deleteQuietly(new File(baseDir, zkBaseDir));
+ startZookeeper(0);
+ startZookeeper(1);
+ }
+
+ public void createTopic(int clusterid, String topicName)
+ {
+ String[] args = new String[9];
+ args[0] = "--zookeeper";
+ args[1] = "localhost:" + TEST_ZOOKEEPER_PORT[clusterid];
+ args[2] = "--replication-factor";
+ args[3] = "1";
+ args[4] = "--partitions";
+ if (hasMultiPartition) {
+ args[5] = "2";
+ } else {
+ args[5] = "1";
+ }
+ args[6] = "--topic";
+ args[7] = topicName;
+ args[8] = "--create";
+
+ ZkUtils zu = ZkUtils.apply("localhost:" + TEST_ZOOKEEPER_PORT[clusterid], 30000, 30000, false);
+ TopicCommand.createTopic(zu, new TopicCommand.TopicCommandOptions(args));
+
+ }
+
+ @AfterClass
+ public static void afterTest()
+ {
+ try {
+ stopKafkaServer();
+ stopZookeeper();
+ } catch (Exception ex) {
+ logger.debug("LSHIL {}", ex.getLocalizedMessage());
+ }
+ }
+
+ public void setHasMultiPartition(boolean hasMultiPartition)
+ {
+ this.hasMultiPartition = hasMultiPartition;
+ }
+
+ public void setHasMultiCluster(boolean hasMultiCluster)
+ {
+ this.hasMultiCluster = hasMultiCluster;
+ }
+
+ public static class TestZookeeperServer extends ZooKeeperServer
+ {
+
+ public TestZookeeperServer()
+ {
+ super();
+ // TODO Auto-generated constructor stub
+ }
+
+ public TestZookeeperServer(File snapDir, File logDir, int tickTime) throws IOException
+ {
+ super(snapDir, logDir, tickTime);
+ // TODO Auto-generated constructor stub
+ }
+
+ public TestZookeeperServer(FileTxnSnapLog txnLogFactory, DataTreeBuilder treeBuilder) throws IOException
+ {
+ super(txnLogFactory, treeBuilder);
+ // TODO Auto-generated constructor stub
+ }
+
+ public TestZookeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, DataTreeBuilder treeBuilder)
+ throws IOException
+ {
+ super(txnLogFactory, tickTime, treeBuilder);
+ // TODO Auto-generated constructor stub
+ }
+
+ public TestZookeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout,
+ int maxSessionTimeout, DataTreeBuilder treeBuilder, ZKDatabase zkDb)
+ {
+ super(txnLogFactory, tickTime, minSessionTimeout, maxSessionTimeout, treeBuilder, zkDb);
+ // TODO Auto-generated constructor stub
+ }
+
+ @Override
+ protected void registerJMX()
+ {
+ }
+
+ @Override
+ protected void unregisterJMX()
+ {
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/kafka09/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java
----------------------------------------------------------------------
diff --git a/kafka/kafka09/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java b/kafka/kafka09/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java
new file mode 100644
index 0000000..2b4307f
--- /dev/null
+++ b/kafka/kafka09/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java
@@ -0,0 +1,425 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kafka;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
+import org.apache.commons.io.FileUtils;
+import org.apache.kafka.clients.producer.ProducerConfig;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.Operator;
+import com.datatorrent.common.util.BaseOperator;
+
+import static com.datatorrent.lib.helper.OperatorContextTestHelper.mockOperatorContext;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+
+public class KafkaOutputOperatorTest extends KafkaOperatorTestBase
+{
+ String testName;
+ private static List<Person> tupleCollection = new LinkedList<>();
+ private final String VALUE_DESERIALIZER = "org.apache.apex.malhar.kafka.KafkaHelper";
+ private final String VALUE_SERIALIZER = "org.apache.apex.malhar.kafka.KafkaHelper";
+
+ public static String APPLICATION_PATH = baseDir + File.separator + "MyKafkaApp" + File.separator;
+
+ @Before
+ public void before()
+ {
+ FileUtils.deleteQuietly(new File(APPLICATION_PATH));
+ testName = TEST_TOPIC + testCounter++;
+ createTopic(0, testName);
+ if (hasMultiCluster) {
+ createTopic(1, testName);
+ }
+ }
+
+ @After
+ public void after()
+ {
+ FileUtils.deleteQuietly(new File(APPLICATION_PATH));
+ }
+
+ @Test
+ public void testExactlyOnceWithFailure() throws Exception
+ {
+ List<Person> toKafka = GenerateList();
+
+ sendDataToKafka(true, toKafka, true, false);
+
+ List<Person> fromKafka = ReadFromKafka();
+
+ Assert.assertTrue("With Failure", compare(fromKafka, toKafka));
+ }
+
+ @Test
+ public void testExactlyOnceWithNoFailure() throws Exception
+ {
+ List<Person> toKafka = GenerateList();
+
+ sendDataToKafka(true, toKafka, false, false);
+
+ List<Person> fromKafka = ReadFromKafka();
+
+ Assert.assertTrue("With No Failure", compare(fromKafka, toKafka));
+ }
+
+ @Test
+ public void testExactlyOnceWithDifferentTuplesAfterRecovery() throws Exception
+ {
+ List<Person> toKafka = GenerateList();
+
+ try {
+ sendDataToKafka(true, toKafka, true, true);
+ } catch (RuntimeException ex) {
+
+ boolean expectedException = false;
+ if (ex.getMessage().contains("Violates")) {
+ expectedException = true;
+ }
+
+ Assert.assertTrue("Different tuples after recovery", expectedException);
+ return;
+ }
+
+ Assert.assertTrue("Wrong tuples during replay, should throw exception", false);
+ }
+
+ @Test
+ public void testKafkaOutput() throws Exception
+ {
+ List<Person> toKafka = GenerateList();
+
+ sendDataToKafka(false, toKafka, false, false);
+
+ List<Person> fromKafka = ReadFromKafka();
+
+ Assert.assertTrue("No failure", compare(fromKafka, toKafka));
+ }
+
+ @Test
+ public void testKafkaOutputWithFailure() throws Exception
+ {
+ List<Person> toKafka = GenerateList();
+
+ sendDataToKafka(false, toKafka, true, true);
+
+ List<Person> fromKafka = ReadFromKafka();
+
+ Assert.assertTrue("No failure", fromKafka.size() > toKafka.size());
+ }
+
+ private void sendDataToKafka(boolean exactlyOnce, List<Person> toKafka, boolean hasFailure,
+ boolean differentTuplesAfterRecovery) throws InterruptedException
+ {
+ Properties props = new Properties();
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VALUE_SERIALIZER);
+ if (!exactlyOnce) {
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaSinglePortExactlyOnceOutputOperator.KEY_SERIALIZER);
+ }
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getClusterConfig());
+ props.put(VALUE_DESERIALIZER_CLASS_CONFIG, VALUE_DESERIALIZER);
+
+ Attribute.AttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
+ attributeMap.put(Context.DAGContext.APPLICATION_NAME, "MyKafkaApp");
+ attributeMap.put(DAG.APPLICATION_PATH, APPLICATION_PATH);
+
+ OperatorContext operatorContext = mockOperatorContext(2, attributeMap);
+
+ cleanUp(operatorContext);
+
+ Operator kafkaOutput;
+ DefaultInputPort<Person> inputPort;
+
+ if (exactlyOnce) {
+ KafkaSinglePortExactlyOnceOutputOperator<Person> kafkaOutputTemp =
+ ResetKafkaOutput(testName, props, operatorContext);
+ inputPort = kafkaOutputTemp.inputPort;
+ kafkaOutput = kafkaOutputTemp;
+ } else {
+ KafkaSinglePortOutputOperator<String, Person> kafkaOutputTemp =
+ ResetKafkaSimpleOutput(testName, props, operatorContext);
+ inputPort = kafkaOutputTemp.inputPort;
+ kafkaOutput = kafkaOutputTemp;
+ }
+
+ kafkaOutput.beginWindow(1);
+ inputPort.getSink().put(toKafka.get(0));
+ inputPort.getSink().put(toKafka.get(1));
+ inputPort.getSink().put(toKafka.get(2));
+ kafkaOutput.endWindow();
+ kafkaOutput.beginWindow(2);
+ inputPort.getSink().put(toKafka.get(3));
+ inputPort.getSink().put(toKafka.get(4));
+ inputPort.getSink().put(toKafka.get(5));
+ kafkaOutput.endWindow();
+ kafkaOutput.beginWindow(3);
+ inputPort.getSink().put(toKafka.get(6));
+ inputPort.getSink().put(toKafka.get(7));
+
+ if (hasFailure) {
+ if (exactlyOnce) {
+ KafkaSinglePortExactlyOnceOutputOperator<Person> kafkaOutputTemp =
+ ResetKafkaOutput(testName, props, operatorContext);
+ inputPort = kafkaOutputTemp.inputPort;
+ kafkaOutput = kafkaOutputTemp;
+ } else {
+ KafkaSinglePortOutputOperator<String, Person> kafkaOutputTemp =
+ ResetKafkaSimpleOutput(testName, props, operatorContext);
+ inputPort = kafkaOutputTemp.inputPort;
+ kafkaOutput = kafkaOutputTemp;
+ }
+
+ kafkaOutput.beginWindow(2);
+ inputPort.getSink().put(toKafka.get(3));
+ inputPort.getSink().put(toKafka.get(4));
+ inputPort.getSink().put(toKafka.get(5));
+ kafkaOutput.endWindow();
+ kafkaOutput.beginWindow(3);
+ inputPort.getSink().put(toKafka.get(6));
+
+ if (!differentTuplesAfterRecovery) {
+ inputPort.getSink().put(toKafka.get(7));
+ }
+ }
+
+ inputPort.getSink().put(toKafka.get(8));
+ inputPort.getSink().put(toKafka.get(9));
+ kafkaOutput.endWindow();
+ kafkaOutput.beginWindow(4);
+ inputPort.getSink().put(toKafka.get(10));
+ inputPort.getSink().put(toKafka.get(11));
+ kafkaOutput.endWindow();
+
+ cleanUp(operatorContext);
+ }
+
+ private KafkaSinglePortExactlyOnceOutputOperator<Person> ResetKafkaOutput(
+ String testName, Properties props, Context.OperatorContext operatorContext)
+ {
+ KafkaSinglePortExactlyOnceOutputOperator<Person> kafkaOutput = new KafkaSinglePortExactlyOnceOutputOperator<>();
+ kafkaOutput.setTopic(testName);
+ kafkaOutput.setProperties(props);
+ kafkaOutput.setup(operatorContext);
+
+ return kafkaOutput;
+ }
+
+ private KafkaSinglePortOutputOperator<String, Person> ResetKafkaSimpleOutput(
+ String testName, Properties props, Context.OperatorContext operatorContext)
+ {
+ KafkaSinglePortOutputOperator<String, Person> kafkaOutput = new KafkaSinglePortOutputOperator<>();
+ kafkaOutput.setTopic(testName);
+ kafkaOutput.setProperties(props);
+ kafkaOutput.setup(operatorContext);
+
+ return kafkaOutput;
+ }
+
+ private void cleanUp(Context.OperatorContext operatorContext)
+ {
+ FSWindowDataManager windowDataManager = new FSWindowDataManager();
+ windowDataManager.setup(operatorContext);
+ try {
+ windowDataManager.committed(windowDataManager.getLargestCompletedWindow());
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ private boolean compare(List<Person> fromKafka, List<Person> toKafka)
+ {
+ if (fromKafka.size() != toKafka.size()) {
+ return false;
+ }
+
+ for (int i = 0; i < fromKafka.size(); ++i) {
+ if (!fromKafka.get(i).equals(toKafka.get(i))) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ private String getClusterConfig()
+ {
+ String l = "localhost:";
+ return l + TEST_KAFKA_BROKER_PORT[0] +
+ (hasMultiCluster ? ";" + l + TEST_KAFKA_BROKER_PORT[1] : "");
+ }
+
+ private List<Person> GenerateList()
+ {
+ List<Person> people = new ArrayList<>();
+
+ for (Integer i = 0; i < 12; ++i) {
+ people.add(new Person(i.toString(), i));
+ }
+
+ return people;
+ }
+
+ private List<Person> ReadFromKafka()
+ {
+ tupleCollection.clear();
+
+ // Create KafkaSinglePortStringInputOperator
+ Properties props = new Properties();
+ props.put(BOOTSTRAP_SERVERS_CONFIG, getClusterConfig());
+ props.put(KEY_DESERIALIZER_CLASS_CONFIG, KafkaSinglePortExactlyOnceOutputOperator.KEY_DESERIALIZER);
+ props.put(VALUE_DESERIALIZER_CLASS_CONFIG, VALUE_DESERIALIZER);
+ props.put(GROUP_ID_CONFIG, "KafkaTest");
+
+ LocalMode lma = LocalMode.newInstance();
+ DAG dag = lma.getDAG();
+
+ // Create KafkaSinglePortStringInputOperator
+ KafkaSinglePortInputOperator node = dag.addOperator("Kafka input", KafkaSinglePortInputOperator.class);
+ node.setConsumerProps(props);
+ node.setInitialPartitionCount(1);
+ // set topic
+ node.setTopics(testName);
+ node.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name());
+ node.setClusters(getClusterConfig());
+ node.setStrategy("one_to_one");
+
+ // Create Test tuple collector
+ CollectorModule collector1 = dag.addOperator("collector", new CollectorModule());
+
+ // Connect ports
+ dag.addStream("Kafka message", node.outputPort, collector1.inputPort);
+
+ // Create local cluster
+ final LocalMode.Controller lc = lma.getController();
+ lc.setHeartbeatMonitoringEnabled(false);
+
+ lc.run(30000);
+
+ return tupleCollection;
+ }
+
+ public static class CollectorModule extends BaseOperator
+ {
+ public final transient CollectorInputPort inputPort = new CollectorInputPort(this);
+
+ long currentWindowId;
+ long operatorId;
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ super.setup(context);
+ operatorId = context.getId();
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ super.beginWindow(windowId);
+ currentWindowId = windowId;
+ }
+
+ @Override
+ public void endWindow()
+ {
+ super.endWindow();
+ }
+ }
+
+ public static class CollectorInputPort extends DefaultInputPort<byte[]>
+ {
+ CollectorModule ownerNode;
+
+ CollectorInputPort(CollectorModule node)
+ {
+ this.ownerNode = node;
+ }
+
+ @Override
+ public void process(byte[] bt)
+ {
+ tupleCollection.add(new KafkaHelper().deserialize("r", bt));
+ }
+ }
+
+ public static class Person
+ {
+ public String name;
+ public Integer age;
+
+ public Person(String name, Integer age)
+ {
+ this.name = name;
+ this.age = age;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ Person person = (Person)o;
+
+ if (name != null ? !name.equals(person.name) : person.name != null) {
+ return false;
+ }
+
+ return age != null ? age.equals(person.age) : person.age == null;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = name != null ? name.hashCode() : 0;
+ result = 31 * result + (age != null ? age.hashCode() : 0);
+ return result;
+ }
+
+ @Override
+ public String toString()
+ {
+ return name + age.toString();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/kafka09/src/test/java/org/apache/apex/malhar/kafka/KafkaTestPartitioner.java
----------------------------------------------------------------------
diff --git a/kafka/kafka09/src/test/java/org/apache/apex/malhar/kafka/KafkaTestPartitioner.java b/kafka/kafka09/src/test/java/org/apache/apex/malhar/kafka/KafkaTestPartitioner.java
new file mode 100644
index 0000000..6098bde
--- /dev/null
+++ b/kafka/kafka09/src/test/java/org/apache/apex/malhar/kafka/KafkaTestPartitioner.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kafka;
+
+import java.util.Map;
+
+import org.apache.kafka.clients.producer.Partitioner;
+import org.apache.kafka.common.Cluster;
+
+import kafka.utils.VerifiableProperties;
+
+/**
+ * A simple partitioner class for test purpose
+ * Key is a int string
+ * Messages are distributed to all partitions
+ * One for even number, the other for odd
+ */
+public class KafkaTestPartitioner implements Partitioner
+{
+ public KafkaTestPartitioner(VerifiableProperties props)
+ {
+
+ }
+
+ public KafkaTestPartitioner()
+ {
+
+ }
+
+ @Override
+ public int partition(String topic, Object key, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster)
+ {
+ int num_partitions = cluster.partitionsForTopic(topic).size();
+ return Integer.parseInt((String)key) % num_partitions;
+ }
+
+ @Override
+ public void close()
+ {
+
+ }
+
+ @Override
+ public void configure(Map<String, ?> map)
+ {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/kafka09/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java
----------------------------------------------------------------------
diff --git a/kafka/kafka09/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java b/kafka/kafka09/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java
new file mode 100644
index 0000000..322f070
--- /dev/null
+++ b/kafka/kafka09/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kafka;
+
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+import com.google.common.collect.Lists;
+
+/**
+ * A kafka producer for testing
+ */
+public class KafkaTestProducer implements Runnable
+{
+ // private static final Logger logger = LoggerFactory.getLogger(KafkaTestProducer.class);
+ private final Producer<String, String> producer;
+ private final Producer<String, String> producer1;
+ private final String topic;
+ private int sendCount = 20;
+ // to generate a random int as a key for partition
+ private final Random rand = new Random();
+ private boolean hasPartition = false;
+ private boolean hasMultiCluster = false;
+ private List<String> messages;
+
+ // http://kafka.apache.org/documentation.html#producerconfigs
+ private String ackType = "1";
+
+ public int getSendCount()
+ {
+ return sendCount;
+ }
+
+ public void setSendCount(int sendCount)
+ {
+ this.sendCount = sendCount;
+ }
+
+ public void setMessages(List<String> messages)
+ {
+ this.messages = messages;
+ }
+
+ private Properties createProducerConfig(int cid)
+ {
+ Properties props = new Properties();
+ props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ props.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, KafkaTestPartitioner.class.getName());
+ String brokerList = "localhost:" + KafkaOperatorTestBase.TEST_KAFKA_BROKER_PORT[cid];
+ brokerList += hasPartition ? (",localhost:" + KafkaOperatorTestBase.TEST_KAFKA_BROKER_PORT[cid]) : "";
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
+ props.setProperty(ProducerConfig.METADATA_MAX_AGE_CONFIG, "20000");
+ props.setProperty(ProducerConfig.ACKS_CONFIG, getAckType());
+ props.setProperty(ProducerConfig.RETRIES_CONFIG, "1");
+ props.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
+
+ return props;
+ }
+
+ public KafkaTestProducer(String topic)
+ {
+ this(topic, false);
+ }
+
+ public KafkaTestProducer(String topic, boolean hasPartition, boolean hasMultiCluster)
+ {
+ // Use random partitioner. Don't need the key type. Just set it to Integer.
+ // The message is of type String.
+ this.topic = topic;
+ this.hasPartition = hasPartition;
+ this.hasMultiCluster = hasMultiCluster;
+ producer = new KafkaProducer<>(createProducerConfig(0));
+ if (hasMultiCluster) {
+ producer1 = new KafkaProducer<>(createProducerConfig(1));
+ } else {
+ producer1 = null;
+ }
+ }
+
+ public KafkaTestProducer(String topic, boolean hasPartition)
+ {
+ this(topic, hasPartition, false);
+ }
+
+ private transient List<Future<RecordMetadata>> sendTasks = Lists.newArrayList();
+
+ private void generateMessages()
+ {
+ // Create dummy message
+ int messageNo = 1;
+ while (messageNo <= sendCount) {
+ String messageStr = "_" + messageNo++;
+ int k = rand.nextInt(100);
+ sendTasks.add(producer.send(new ProducerRecord<>(topic, "" + k, "c1" + messageStr)));
+ if (hasMultiCluster && messageNo <= sendCount) {
+ messageStr = "_" + messageNo++;
+ sendTasks.add(producer1.send(new ProducerRecord<>(topic, "" + k, "c2" + messageStr)));
+ }
+ // logger.debug(String.format("Producing %s", messageStr));
+ }
+ // produce the end tuple to let the test input operator know it's done produce messages
+ for (int i = 0; i < (hasPartition ? 2 : 1); ++i) {
+ sendTasks.add(producer.send(new ProducerRecord<>(topic, "" + i, KafkaOperatorTestBase.END_TUPLE)));
+ if (hasMultiCluster) {
+ sendTasks.add(producer1.send(new ProducerRecord<>(topic, "" + i, KafkaOperatorTestBase.END_TUPLE)));
+ }
+ }
+ }
+
+ @Override
+ public void run()
+ {
+ if (messages == null) {
+ generateMessages();
+ } else {
+ for (String msg : messages) {
+ sendTasks.add(producer.send(new ProducerRecord<>(topic, "", msg)));
+ }
+ }
+
+ producer.flush();
+ if (producer1 != null) {
+ producer1.flush();
+ }
+
+ try {
+ for (Future<RecordMetadata> task : sendTasks) {
+ task.get(30, TimeUnit.SECONDS);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ close();
+ }
+
+ public void close()
+ {
+ producer.close();
+ if (producer1 != null) {
+ producer1.close();
+ }
+ }
+
+ public String getAckType()
+ {
+ return ackType;
+ }
+
+ public void setAckType(String ackType)
+ {
+ this.ackType = ackType;
+ }
+} // End of KafkaTestProducer
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/kafka09/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/kafka/kafka09/src/test/resources/log4j.properties b/kafka/kafka09/src/test/resources/log4j.properties
new file mode 100644
index 0000000..71ac284
--- /dev/null
+++ b/kafka/kafka09/src/test/resources/log4j.properties
@@ -0,0 +1,49 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+log4j.rootLogger=DEBUG,CONSOLE
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.CONSOLE.threshold=${test.log.console.threshold}
+test.log.console.threshold=DEBUG
+
+log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.RFA.File=/tmp/app.log
+
+# to enable, add SYSLOG to rootLogger
+log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
+log4j.appender.SYSLOG.syslogHost=127.0.0.1
+log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
+log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n
+log4j.appender.SYSLOG.Facility=LOCAL1
+
+#log4j.logger.org=INFO
+
+#log4j.logger.org.apache.commons.beanutils=warn
+log4j.logger.com.datatorrent=INFO
+log4j.logger.org.apache.apex=INFO
+
+log4j.logger.org.apache.kafka=WARN
+log4j.logger.kafka.consumer=WARN
+log4j.logger.kafka=WARN
+log4j.logger.org.apache.zookeeper=WARN
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/kafka/pom.xml b/kafka/pom.xml
index 5a95067..9446e9e 100755
--- a/kafka/pom.xml
+++ b/kafka/pom.xml
@@ -28,9 +28,9 @@
<version>3.8.0-SNAPSHOT</version>
</parent>
- <artifactId>malhar-kafka</artifactId>
+ <artifactId>malhar-kafka-connectors</artifactId>
<name>Apache Apex Malhar Kafka Support</name>
- <packaging>jar</packaging>
+ <packaging>pom</packaging>
<properties>
<checkstyle.console>false</checkstyle.console>
@@ -52,171 +52,142 @@
</execution>
</executions>
</plugin>
- <!-- create resource directory for xml javadoc-->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-antrun-plugin</artifactId>
- <executions>
- <execution>
- <id>createJavadocDirectory</id>
- <phase>generate-resources</phase>
- <configuration>
- <tasks>
- <delete dir="${project.build.directory}/generated-resources/xml-javadoc"/>
- <mkdir dir="${project.build.directory}/generated-resources/xml-javadoc"/>
- </tasks>
- </configuration>
- <goals>
- <goal>run</goal>
- </goals>
- </execution>
- </executions>
+ <!-- create resource directory for xml javadoc-->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>createJavadocDirectory</id>
+ <phase>generate-resources</phase>
+ <configuration>
+ <tasks>
+ <delete dir="${project.build.directory}/generated-resources/xml-javadoc"/>
+ <mkdir dir="${project.build.directory}/generated-resources/xml-javadoc"/>
+ </tasks>
+ </configuration>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <!-- generate javdoc -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <executions>
+ <!-- generate xml javadoc -->
+ <execution>
+ <id>xml-doclet</id>
+ <phase>generate-resources</phase>
+ <goals>
+ <goal>javadoc</goal>
+ </goals>
+ <configuration>
+ <doclet>com.github.markusbernhardt.xmldoclet.XmlDoclet</doclet>
+ <additionalparam>-d ${project.build.directory}/generated-resources/xml-javadoc -filename ${project.artifactId}-${project.version}-javadoc.xml</additionalparam>
+ <useStandardDocletOptions>false</useStandardDocletOptions>
+ <docletArtifact>
+ <groupId>com.github.markusbernhardt</groupId>
+ <artifactId>xml-doclet</artifactId>
+ <version>1.0.4</version>
+ </docletArtifact>
+ </configuration>
+ </execution>
+ <!-- generate default javadoc jar with custom tags -->
+ <execution>
+ <id>attach-sources</id>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ <configuration>
+ <skip>true</skip>
+ <tags>
+ <tag>
+ <name>customTag1</name>
+ <placement>a</placement>
+ <head>Custom Tag One:</head>
+ </tag>
+ <tag>
+ <name>customTag2</name>
+ <placement>a</placement>
+ <head>Custom Tag two:</head>
+ </tag>
+ <tag>
+ <name>customTag3</name>
+ <placement>a</placement>
+ <head>Custom Tag three:</head>
+ </tag>
+ </tags>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <!-- Transform xml javadoc to stripped down version containing only class/interface comments and tags-->
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>xml-maven-plugin</artifactId>
+ <version>1.0</version>
+ <executions>
+ <execution>
+ <id>transform-xmljavadoc</id>
+ <phase>generate-resources</phase>
+ <goals>
+ <goal>transform</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <transformationSets>
+ <transformationSet>
+ <dir>${project.build.directory}/generated-resources/xml-javadoc</dir>
+ <includes>
+ <include>${project.artifactId}-${project.version}-javadoc.xml</include>
+ </includes>
+ <stylesheet>XmlJavadocCommentsExtractor.xsl</stylesheet>
+ <outputDir>${project.build.directory}/generated-resources/xml-javadoc</outputDir>
+ </transformationSet>
+ </transformationSets>
+ </configuration>
+ </plugin>
+ <!-- copy xml javadoc to class jar -->
+ <plugin>
+ <artifactId>maven-resources-plugin</artifactId>
+ <version>2.6</version>
+ <executions>
+ <execution>
+ <id>copy-resources</id>
+ <phase>process-resources</phase>
+ <goals>
+ <goal>copy-resources</goal>
+ </goals>
+ <configuration>
+ <outputDirectory>${basedir}/target/classes</outputDirectory>
+ <resources>
+ <resource>
+ <directory>${project.build.directory}/generated-resources/xml-javadoc</directory>
+ <includes>
+ <include>${project.artifactId}-${project.version}-javadoc.xml</include>
+ </includes>
+ <filtering>true</filtering>
+ </resource>
+ </resources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <argLine>-Xmx2048m</argLine>
+ </configuration>
</plugin>
- <!-- generate javdoc -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-javadoc-plugin</artifactId>
- <executions>
- <!-- generate xml javadoc -->
- <execution>
- <id>xml-doclet</id>
- <phase>generate-resources</phase>
- <goals>
- <goal>javadoc</goal>
- </goals>
- <configuration>
- <doclet>com.github.markusbernhardt.xmldoclet.XmlDoclet</doclet>
- <additionalparam>-d ${project.build.directory}/generated-resources/xml-javadoc -filename ${project.artifactId}-${project.version}-javadoc.xml</additionalparam>
- <useStandardDocletOptions>false</useStandardDocletOptions>
- <docletArtifact>
- <groupId>com.github.markusbernhardt</groupId>
- <artifactId>xml-doclet</artifactId>
- <version>1.0.4</version>
- </docletArtifact>
- </configuration>
- </execution>
- <!-- generate default javadoc jar with custom tags -->
- <execution>
- <id>attach-sources</id>
- <goals>
- <goal>jar</goal>
- </goals>
- <configuration>
- <skip>true</skip>
- <tags>
- <tag>
- <name>customTag1</name>
- <placement>a</placement>
- <head>Custom Tag One:</head>
- </tag>
- <tag>
- <name>customTag2</name>
- <placement>a</placement>
- <head>Custom Tag two:</head>
- </tag>
- <tag>
- <name>customTag3</name>
- <placement>a</placement>
- <head>Custom Tag three:</head>
- </tag>
- </tags>
- </configuration>
- </execution>
- </executions>
- </plugin>
- <!-- Transform xml javadoc to stripped down version containing only class/interface comments and tags-->
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>xml-maven-plugin</artifactId>
- <version>1.0</version>
- <executions>
- <execution>
- <id>transform-xmljavadoc</id>
- <phase>generate-resources</phase>
- <goals>
- <goal>transform</goal>
- </goals>
- </execution>
- </executions>
- <configuration>
- <transformationSets>
- <transformationSet>
- <dir>${project.build.directory}/generated-resources/xml-javadoc</dir>
- <includes>
- <include>${project.artifactId}-${project.version}-javadoc.xml</include>
- </includes>
- <stylesheet>XmlJavadocCommentsExtractor.xsl</stylesheet>
- <outputDir>${project.build.directory}/generated-resources/xml-javadoc</outputDir>
- </transformationSet>
- </transformationSets>
- </configuration>
- </plugin>
- <!-- copy xml javadoc to class jar -->
- <plugin>
- <artifactId>maven-resources-plugin</artifactId>
- <version>2.6</version>
- <executions>
- <execution>
- <id>copy-resources</id>
- <phase>process-resources</phase>
- <goals>
- <goal>copy-resources</goal>
- </goals>
- <configuration>
- <outputDirectory>${basedir}/target/classes</outputDirectory>
- <resources>
- <resource>
- <directory>${project.build.directory}/generated-resources/xml-javadoc</directory>
- <includes>
- <include>${project.artifactId}-${project.version}-javadoc.xml</include>
- </includes>
- <filtering>true</filtering>
- </resource>
- </resources>
- </configuration>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <artifactId>maven-surefire-plugin</artifactId>
- <configuration>
- <argLine>-Xmx2048m</argLine>
- </configuration>
- </plugin>
</plugins>
</build>
<dependencies>
<dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.11</artifactId>
- <version>0.9.0.0</version>
- <optional>true</optional>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-simple</artifactId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- <version>0.9.0.0</version>
- </dependency>
- <dependency>
<groupId>${project.groupId}</groupId>
<artifactId>malhar-library</artifactId>
<version>${project.version}</version>
@@ -234,12 +205,11 @@
<version>${apex.core.version}</version>
<type>jar</type>
</dependency>
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.11</artifactId>
- <version>0.9.0.0</version>
- <classifier>test</classifier>
- <scope>test</scope>
- </dependency>
</dependencies>
+
+ <modules>
+ <module>kafka-common</module>
+ <module>kafka09</module>
+ <module>kafka010</module>
+ </modules>
</project>