You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by hs...@apache.org on 2016/07/07 17:01:39 UTC
[1/5] apex-malhar git commit: APEXMALHAR-2076 #resolve #comment add
AbstractTupleUniqueExactlyOnceKafkaOutputOperator
Repository: apex-malhar
Updated Branches:
refs/heads/master c4a11299b -> cc9d50366
APEXMALHAR-2076 #resolve #comment add AbstractTupleUniqueExactlyOnceKafkaOutputOperator
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/33a5c2ec
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/33a5c2ec
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/33a5c2ec
Branch: refs/heads/master
Commit: 33a5c2ec95c9ee6f33023ea4ae82d156a140cb25
Parents: 72de840
Author: brightchen <br...@datatorrent.com>
Authored: Wed May 25 13:01:26 2016 -0700
Committer: brightchen <br...@datatorrent.com>
Committed: Thu Jun 2 11:18:21 2016 -0700
----------------------------------------------------------------------
.../kafka/AbstractKafkaOutputOperator.java | 2 +-
...pleUniqueExactlyOnceKafkaOutputOperator.java | 610 +++++++++++++++++++
.../contrib/kafka/KafkaMetadataUtil.java | 121 +++-
.../datatorrent/contrib/kafka/KafkaUtil.java | 358 +++++++++++
...upleUniqueExactlyOnceOutputOperatorTest.java | 512 ++++++++++++++++
.../contrib/kafka/KafkaUtilTester.java | 128 ++++
contrib/src/test/resources/log4j.properties | 1 +
7 files changed, 1713 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/33a5c2ec/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaOutputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaOutputOperator.java
index f0835c4..8003669 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaOutputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaOutputOperator.java
@@ -100,7 +100,7 @@ public abstract class AbstractKafkaOutputOperator<K, V> implements Operator
return new ProducerConfig(configProperties);
}
-
+
public Producer<K, V> getProducer()
{
return producer;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/33a5c2ec/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractTupleUniqueExactlyOnceKafkaOutputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractTupleUniqueExactlyOnceKafkaOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractTupleUniqueExactlyOnceKafkaOutputOperator.java
new file mode 100644
index 0000000..15fea37
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractTupleUniqueExactlyOnceKafkaOutputOperator.java
@@ -0,0 +1,610 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.kafka;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.common.util.Pair;
+
+import kafka.api.OffsetRequest;
+import kafka.javaapi.consumer.SimpleConsumer;
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+import kafka.serializer.StringDecoder;
+
+/**
+ * Assumptions: - assume the value of incoming tuples are not duplicate(at least
+ * in one window) among all operator partitions. - assume one Kafka partition
+ * can be written by multiple operator partitions at the same time - assume the
+ * the Kafka partition was decided by tuple value itself( not depended on
+ * operator partition)
+ *
+ * Notes: - the order of data could be changed when replay. - the data could go
+ * to the other partition when replay. For example if the upstream operator
+ * failed.
+ *
+ * Implementation: for each Kafka partition, load minimum last window and the
+ * minimum offset of the last window of all operator partitions. And then load
+ * the tuples from Kafka based on this minimum offset. When processing tuple, if
+ * the window id is less than the minimum last window, just ignore the tuple. If
+ * window id equals loaded minimum window id, and tuple equals any of loaded
+ * tuple, ignore it. Else, send to Kafka
+ *
+ * @displayName Abstract Tuple Unique Exactly Once Kafka Output
+ * @category Messaging
+ * @tags output operator
+ */
+@InterfaceStability.Evolving
+public abstract class AbstractTupleUniqueExactlyOnceKafkaOutputOperator<T, K, V>
+ extends AbstractKafkaOutputOperator<K, V>
+{
+ public static final String DEFAULT_CONTROL_TOPIC = "ControlTopic";
+ protected transient int partitionNum = 1;
+
+ /**
+ * allow client set the partitioner as partitioner may need some attributes
+ */
+ protected kafka.producer.Partitioner partitioner;
+
+ protected transient int operatorPartitionId;
+
+ protected String controlTopic = DEFAULT_CONTROL_TOPIC;
+
+ //The control info includes the time, use this time to track the head of control info we care.
+ protected int controlInfoTrackBackTime = 120000;
+
+ /**
+ * max number of offset need to check
+ */
+ protected int maxNumOffsetsOfControl = 1000;
+
+ protected String controlProducerProperties;
+ protected Set<String> brokerSet;
+
+ protected transient long currentWindowId;
+
+ /**
+ * the map from Kafka partition id to the control offset. this one is
+ * checkpointed and as the start offset to load the recovery control
+ * information Note: this only keep the information of this operator
+ * partition.
+ */
+ protected transient Map<Integer, Long> partitionToLastControlOffset = Maps.newHashMap();
+
+ /**
+ * keep the minimal last window id for recovery. If only one partition
+ * crashed, it is ok just use the last window id of this operator partition as
+ * the recovery window id If all operator partitions crashed, should use the
+ * minimal last window id as the recovery window id, as the data may go to the
+ * other partitions. But as the operator can't distinguish which is the case.
+ * use the most general one.
+ */
+ protected transient long minRecoveryWindowId = -2;
+ protected transient long maxRecoveryWindowId = -2;
+
+ /**
+ * A map from Kafka partition id to lastMessages writtten to this kafka
+ * partition. This information was loaded depends on the
+ * RecoveryControlInfo.kafkaPartitionIdToOffset
+ */
+ protected transient Map<Integer, List<Pair<byte[], byte[]>>> partitionToLastMsgs = Maps.newHashMap();
+
+ /**
+ * The messages are assume to written to the kafka partition decided by
+ * tupleToKeyValue(T tuple) and partitioner. But it also depended on the
+ * system. for example, it could be only one partition when create topic.
+ * Don't distinguish kafka partitions if partition is not reliable.
+ */
+ protected transient Set<Pair<byte[], byte[]>> totalLastMsgs = Sets.newHashSet();
+
+ protected transient RecoveryControlInfo controlInfo = new RecoveryControlInfo();
+ protected transient Producer<String, String> controlDataProducer;
+ protected transient StringDecoder controlInfoDecoder;
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ getBrokerSet();
+
+ super.setup(context);
+ controlInfoDecoder = new StringDecoder(null);
+
+ operatorPartitionId = context.getId();
+
+ controlDataProducer = new Producer<String, String>(createKafkaControlProducerConfig());
+
+ if (partitioner == null) {
+ createDefaultPartitioner();
+ }
+
+ loadControlData();
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ currentWindowId = windowId;
+ }
+
+ /**
+ * Implement Operator Interface.
+ */
+ @Override
+ public void endWindow()
+ {
+ //we'd better flush the cached tuples, but Kafka 0.8.1 doesn't support flush.
+ //keep the control information of this operator partition to control topic
+ saveControlData();
+ }
+
+ protected void createDefaultPartitioner()
+ {
+ try {
+ String className = (String)getConfigProperties().get(KafkaMetadataUtil.PRODUCER_PROP_PARTITIONER);
+ if (className != null) {
+ partitioner = (kafka.producer.Partitioner)Class.forName(className).newInstance();
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to initialize partitioner", e);
+ }
+ }
+
+ /**
+ * load control data OUTPUT: lastMsgs and partitionToMinLastWindowId
+ */
+ protected void loadControlData()
+ {
+ long loadDataTime = System.currentTimeMillis();
+
+ final String clientNamePrefix = getClientNamePrefix();
+ Map<Integer, SimpleConsumer> consumers = KafkaUtil.createSimpleConsumers(clientNamePrefix, brokerSet, controlTopic);
+ if (consumers == null || consumers.size() != 1) {
+ logger.error("The consumer for recovery information was not expected. {}", consumers);
+ return;
+ }
+ final SimpleConsumer consumer = consumers.get(0);
+ if (consumer == null) {
+ logger.error("No consumer for recovery information.");
+ return;
+ }
+
+ long latestOffset = KafkaMetadataUtil.getLastOffset(consumer, controlTopic, 0, OffsetRequest.LatestTime(),
+ KafkaMetadataUtil.getClientName(clientNamePrefix, controlTopic, 0));
+ logger.debug("latestOffsets: {}", latestOffset);
+ if (latestOffset <= 0) {
+ return;
+ }
+
+ int batchMessageSize = 100;
+ List<Pair<byte[], byte[]>> messages = Lists.newArrayList();
+
+ boolean isControlMessageEnough = false;
+ Map<Integer, RecoveryControlInfo> operatorPartitionIdToLastControlInfo = Maps.newHashMap();
+
+ while (latestOffset > 0 && !isControlMessageEnough) {
+ long startOffset = latestOffset - batchMessageSize + 1;
+ if (startOffset < 0) {
+ startOffset = 0;
+ }
+
+ //read offsets as batch and handle them.
+ messages.clear();
+ KafkaUtil.readMessagesBetween(consumer, KafkaMetadataUtil.getClientName(clientNamePrefix, controlTopic, 0),
+ controlTopic, 0, startOffset, latestOffset - 1, messages, 3);
+ for (Pair<byte[], byte[]> message : messages) {
+ //handle the message; we have to handle all the message.
+ RecoveryControlInfo rci = RecoveryControlInfo.fromString((String)controlInfoDecoder.fromBytes(message.second));
+ isControlMessageEnough = (loadControlInfoIntermedia(rci, loadDataTime,
+ operatorPartitionIdToLastControlInfo) == 0);
+
+ if (isControlMessageEnough) {
+ break;
+ }
+ }
+
+ latestOffset = startOffset - 1;
+ }
+
+ loadRecoveryWindowId(operatorPartitionIdToLastControlInfo);
+ loadLastMessages(operatorPartitionIdToLastControlInfo);
+ }
+
+ /**
+ * load the recovery window id. right now use the minimal window id as the
+ * recovery window id Different Operator partitions maybe crashed at different
+ * window. use the minimal window of all operator partitions as the window for
+ * recovery.
+ *
+ * @param operatorPartitionIdToLastWindowId
+ */
+ protected void loadRecoveryWindowId(Map<Integer, RecoveryControlInfo> operatorPartitionIdToLastControlInfo)
+ {
+ for (RecoveryControlInfo rci : operatorPartitionIdToLastControlInfo.values()) {
+ if (minRecoveryWindowId < 0 || rci.windowId < minRecoveryWindowId) {
+ minRecoveryWindowId = rci.windowId;
+ }
+ if (maxRecoveryWindowId < 0 || rci.windowId > maxRecoveryWindowId) {
+ maxRecoveryWindowId = rci.windowId;
+ }
+ }
+ }
+
+ /**
+ * load control information from intermedia to
+ *
+ * @param operatorPartitionIdToLastWindowId
+ * @param operatorToKafkaToOffset
+ */
+ protected void loadLastMessages(Map<Integer, RecoveryControlInfo> operatorPartitionIdToLastControlInfo)
+ {
+ partitionToLastControlOffset.clear();
+
+ for (Map.Entry<Integer, RecoveryControlInfo> entry : operatorPartitionIdToLastControlInfo.entrySet()) {
+ RecoveryControlInfo rci = entry.getValue();
+ if (rci.windowId == this.minRecoveryWindowId) {
+ //get the minimal offset
+ for (Map.Entry<Integer, Long> kafkaPartitionEntry : rci.kafkaPartitionIdToOffset.entrySet()) {
+ Long offset = partitionToLastControlOffset.get(kafkaPartitionEntry.getKey());
+ if (offset == null || offset > kafkaPartitionEntry.getValue()) {
+ partitionToLastControlOffset.put(kafkaPartitionEntry.getKey(), kafkaPartitionEntry.getValue());
+ }
+ }
+ }
+ }
+
+ partitionToLastMsgs.clear();
+
+ KafkaUtil.readMessagesAfterOffsetTo(getClientNamePrefix(), brokerSet, getTopic(), partitionToLastControlOffset,
+ partitionToLastMsgs);
+
+ loadTotalLastMsgs();
+ }
+
+ /**
+ * load Total Last Messages from partitionToLastMsgs;
+ */
+ protected void loadTotalLastMsgs()
+ {
+ totalLastMsgs.clear();
+ if (partitionToLastMsgs == null || partitionToLastMsgs.isEmpty()) {
+ return;
+ }
+ for (List<Pair<byte[], byte[]>> msgs : partitionToLastMsgs.values()) {
+ totalLastMsgs.addAll(msgs);
+ }
+ }
+
+ protected int loadControlInfoIntermedia(RecoveryControlInfo controlInfo, long loadDataTime,
+ Map<Integer, RecoveryControlInfo> operatorPartitionIdToLastControlInfo)
+ {
+ if (controlInfo.generateTime + controlInfoTrackBackTime < loadDataTime) {
+ return 0;
+ }
+
+ //The record should be in ascent order, so the later should override the previous
+ operatorPartitionIdToLastControlInfo.put(controlInfo.partitionIdOfOperator, controlInfo);
+
+ return 1;
+ }
+
+ /**
+ * Current implementation we can get the number of operator partitions. So we
+ * we use the controlInfoTrackBackTime to control the trace back of control
+ * information.
+ *
+ * @param controlInfo
+ * @param loadDataTime
+ * @param operatorPartitionIdToLastWindowId
+ * @param operatorToKafkaToOffset
+ * @return 0 if control information is enough and don't need to load any more
+ */
+ protected int loadControlInfoIntermedia(RecoveryControlInfo controlInfo, long loadDataTime,
+ Map<Integer, Long> operatorPartitionIdToLastWindowId, Map<Integer, Map<Integer, Long>> operatorToKafkaToOffset)
+ {
+ if (controlInfo.generateTime + controlInfoTrackBackTime < loadDataTime) {
+ return 0;
+ }
+
+ //The record should be in ascent order, so the later should override the previous
+ operatorPartitionIdToLastWindowId.put(controlInfo.partitionIdOfOperator, controlInfo.windowId);
+ operatorToKafkaToOffset.put(controlInfo.partitionIdOfOperator, controlInfo.kafkaPartitionIdToOffset);
+
+ return 1;
+ }
+
+ /**
+ * save the control data. each operator partition only save its control data
+ */
+ protected void saveControlData()
+ {
+ controlInfo.generateTime = System.currentTimeMillis();
+ controlInfo.partitionIdOfOperator = operatorPartitionId;
+ controlInfo.windowId = this.currentWindowId;
+ if (controlInfo.kafkaPartitionIdToOffset == null) {
+ controlInfo.kafkaPartitionIdToOffset = Maps.newHashMap();
+ } else {
+ controlInfo.kafkaPartitionIdToOffset.clear();
+ }
+ KafkaMetadataUtil.getLastOffsetsTo(getClientNamePrefix(), brokerSet, getTopic(),
+ controlInfo.kafkaPartitionIdToOffset);
+
+ //send to control topic
+ controlDataProducer.send(new KeyedMessage<String, String>(getControlTopic(), null, 0, controlInfo.toString()));
+ }
+
+ protected String getClientNamePrefix()
+ {
+ return getClass().getName().replace('$', '.');
+ }
+
+
+ protected Set<String> getBrokerSet()
+ {
+ if (brokerSet == null) {
+ brokerSet = Sets.newHashSet((String)getConfigProperties().get(KafkaMetadataUtil.PRODUCER_PROP_BROKERLIST));
+ }
+ return brokerSet;
+ }
+
+ /**
+ * This input port receives tuples that will be written out to Kafka.
+ */
+ public final transient DefaultInputPort<T> inputPort = new DefaultInputPort<T>()
+ {
+ @Override
+ public void process(T tuple)
+ {
+ processTuple(tuple);
+ }
+ };
+
+ /**
+ * separate to a method to give sub-class chance to override
+ *
+ * @param tuple
+ */
+ protected void processTuple(T tuple)
+ {
+ Pair<K, V> keyValue = tupleToKeyValue(tuple);
+ final int pid = getPartitionKey(keyValue.first);
+
+ if (!skipTuple(pid, keyValue)) {
+ getProducer().send(new KeyedMessage<K, V>(getTopic(), keyValue.first, pid, keyValue.second));
+ sendCount++;
+ }
+ }
+
+ protected boolean skipTuple(int partitionId, Pair<K, V> msg)
+ {
+ if (currentWindowId <= minRecoveryWindowId) {
+ return true;
+ }
+ if (currentWindowId > maxRecoveryWindowId + 1) {
+ return false;
+ }
+
+ return isDuplicateTuple(partitionId, msg);
+ }
+
+ protected boolean isDuplicateTuple(int partitionId, Pair<K, V> msg)
+ {
+ Collection<Pair<byte[], byte[]>> lastMsgs = partitionToLastMsgs.get(partitionId);
+
+ //check depended on the partition only
+ if (lastMsgs == null || lastMsgs.isEmpty()) {
+ lastMsgs = totalLastMsgs;
+ }
+
+ for (Pair<byte[], byte[]> cachedMsg : lastMsgs) {
+ if (equals(cachedMsg, msg)) {
+ return true;
+ }
+ }
+ return false;
+
+ }
+
+ protected boolean equals(Pair<byte[], byte[]> cachedMsg, Pair<K, V> msg)
+ {
+ if (cachedMsg.first == null ^ msg.first == null) {
+ return false;
+ }
+ if (cachedMsg.second == null ^ msg.second == null) {
+ return false;
+ }
+
+ if (cachedMsg.first == null && msg.first == null && cachedMsg.second == null && msg.second == null) {
+ return true;
+ }
+
+ if (!equals(cachedMsg.first, msg.first)) {
+ return false;
+ }
+
+ return equals(cachedMsg.second, msg.second);
+ }
+
+ /**
+ *
+ * @param bytes
+ * @param value
+ * @return
+ */
+ protected abstract <M> boolean equals(byte[] bytes, M value);
+
+ /**
+ * get the partition key. for 0.8.1, If a partition key is provided it will
+ * override the key for the purpose of partitioning but will not be stored.
+ *
+ * @return
+ */
+ protected int getPartitionKey(K key)
+ {
+ if (partitioner != null) {
+ return partitioner.partition(key, partitionNum);
+ }
+
+ if (key != null) {
+ return key.hashCode();
+ }
+
+ //stick to the Kafka partition, so can't use round robbin
+ return 0;
+ }
+
+ /**
+ * setup the configuration for control producer
+ *
+ * @return
+ */
+ protected ProducerConfig createKafkaControlProducerConfig()
+ {
+ if (controlProducerProperties == null || controlProducerProperties.isEmpty()) {
+ controlProducerProperties = getProducerProperties();
+ }
+
+ Properties prop = new Properties();
+ for (String propString : controlProducerProperties.split(",")) {
+ if (!propString.contains("=")) {
+ continue;
+ }
+ String[] keyVal = StringUtils.trim(propString).split("=");
+ prop.put(StringUtils.trim(keyVal[0]), StringUtils.trim(keyVal[1]));
+ }
+
+ //only support String encoder now, overwrite
+ prop.setProperty("serializer.class", "kafka.serializer.StringEncoder");
+ prop.setProperty("key.serializer.class", "kafka.serializer.StringEncoder");
+
+ Properties configProperties = this.getConfigProperties();
+ configProperties.putAll(prop);
+
+ return new ProducerConfig(configProperties);
+ }
+
+ /**
+ * Tell the operator how to convert a input tuple to a kafka key value pair
+ *
+ * @param tuple
+ * @return A kafka key value pair.
+ */
+ protected abstract Pair<K, V> tupleToKeyValue(T tuple);
+
+ public kafka.producer.Partitioner getPartitioner()
+ {
+ return partitioner;
+ }
+
+ public void setPartitioner(kafka.producer.Partitioner partitioner)
+ {
+ this.partitioner = partitioner;
+ }
+
+ public String getControlTopic()
+ {
+ return controlTopic;
+ }
+
+ public void setControlTopic(String controlTopic)
+ {
+ this.controlTopic = controlTopic;
+ }
+
+ public String getControlProducerProperties()
+ {
+ return controlProducerProperties;
+ }
+
+ public void setControlProducerProperties(String controlProducerProperties)
+ {
+ this.controlProducerProperties = controlProducerProperties;
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(AbstractExactlyOnceKafkaOutputOperator.class);
+
+ /**
+ * This class used to keep the recovery information
+ *
+ */
+ protected static class RecoveryControlInfo
+ {
+ protected static final String SEPERATOR = "#";
+ protected int partitionIdOfOperator;
+ protected long generateTime;
+ protected long windowId;
+ protected Map<Integer, Long> kafkaPartitionIdToOffset;
+ //( operatorPartitionId => ( lastWindowId, (KafkaPartitionId => offset) ) )
+
+ @Override
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append(partitionIdOfOperator).append(SEPERATOR).append(generateTime).append(SEPERATOR).append(windowId);
+ sb.append(SEPERATOR).append(kafkaPartitionIdToOffset);
+ return sb.toString();
+ }
+
+ public static RecoveryControlInfo fromString(String str)
+ {
+ if (str == null || str.isEmpty()) {
+ throw new IllegalArgumentException("Input parameter is null or empty.");
+ }
+ String[] fields = str.split(SEPERATOR);
+ if (fields == null || fields.length != 4) {
+ throw new IllegalArgumentException(
+ "Invalid input String: \"" + str + "\", " + "expected fields seperated by '" + SEPERATOR + "'");
+ }
+
+ RecoveryControlInfo rci = new RecoveryControlInfo();
+ rci.partitionIdOfOperator = Integer.valueOf(fields[0]);
+ rci.generateTime = Long.valueOf(fields[1]);
+ rci.windowId = Long.valueOf(fields[2]);
+
+ String mapString = fields[3].trim();
+ if (mapString.startsWith("{") && mapString.endsWith("}")) {
+ mapString = mapString.substring(1, mapString.length() - 1);
+ }
+ Map<String, String> idToOffsetAsString = Splitter.on(",").withKeyValueSeparator("=").split(mapString);
+ rci.kafkaPartitionIdToOffset = Maps.newHashMap();
+ for (Map.Entry<String, String> entry : idToOffsetAsString.entrySet()) {
+ rci.kafkaPartitionIdToOffset.put(Integer.valueOf(entry.getKey()), Long.valueOf(entry.getValue()));
+ }
+ return rci;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/33a5c2ec/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaMetadataUtil.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaMetadataUtil.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaMetadataUtil.java
index f6057cd..5f4d4c4 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaMetadataUtil.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaMetadataUtil.java
@@ -28,11 +28,10 @@ import java.util.Map;
import java.util.Set;
import org.I0Itec.zkclient.ZkClient;
-import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.collection.JavaConversions;
+import org.apache.commons.lang3.StringUtils;
import com.google.common.collect.Maps;
import com.google.common.collect.Maps.EntryTransformer;
@@ -50,6 +49,8 @@ import kafka.javaapi.consumer.SimpleConsumer;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
+import scala.collection.JavaConversions;
+
/**
* A util class used to retrieve all the metadatas for partitions/topics
* Every method in the class creates a temporary simple kafka consumer and
@@ -69,7 +70,7 @@ public class KafkaMetadataUtil
// A temporary client used to retrieve the metadata of topic/partition etc
private static final String mdClientId = "Kafka_Metadata_Lookup_Client";
- private static final int timeout=10000;
+ private static final int timeout = 10000;
//buffer size for MD lookup client is 128k should be enough for most cases
private static final int bufferSize = 128 * 1024;
@@ -95,20 +96,23 @@ public class KafkaMetadataUtil
* @return Get the partition metadata list for the specific topic via the brokers
* null if topic is not found
*/
- public static Map<String, List<PartitionMetadata>> getPartitionsForTopic(SetMultimap<String, String> brokers, final String topic)
+ public static Map<String, List<PartitionMetadata>> getPartitionsForTopic(SetMultimap<String, String> brokers,
+ final String topic)
{
- return Maps.transformEntries(brokers.asMap(), new EntryTransformer<String, Collection<String>, List<PartitionMetadata>>(){
- @Override
- public List<PartitionMetadata> transformEntry(String key, Collection<String> bs)
- {
- return getPartitionsForTopic(new HashSet<String>(bs), topic);
- }});
+ return Maps.transformEntries(brokers.asMap(),
+ new EntryTransformer<String, Collection<String>, List<PartitionMetadata>>()
+ {
+ @Override
+ public List<PartitionMetadata> transformEntry(String key, Collection<String> bs)
+ {
+ return getPartitionsForTopic(new HashSet<String>(bs), topic);
+ }
+ });
}
-
- public static Set<String> getBrokers(Set<String> zkHost){
-
- ZkClient zkclient = new ZkClient(StringUtils.join(zkHost, ',') ,30000, 30000, ZKStringSerializer$.MODULE$);
+ public static Set<String> getBrokers(Set<String> zkHost)
+ {
+ ZkClient zkclient = new ZkClient(StringUtils.join(zkHost, ','), 30000, 30000, ZKStringSerializer$.MODULE$);
Set<String> brokerHosts = new HashSet<String>();
for (Broker b : JavaConversions.asJavaIterable(ZkUtils.getAllBrokersInCluster(zkclient))) {
brokerHosts.add(b.getConnectionString());
@@ -149,7 +153,7 @@ public class KafkaMetadataUtil
public static TopicMetadata getTopicMetadata(Set<String> brokerSet, String topic)
{
SimpleConsumer mdConsumer = null;
- if (brokerSet == null || brokerSet == null || brokerSet.size() == 0) {
+ if (brokerSet == null || brokerSet.size() == 0) {
return null;
}
try {
@@ -191,12 +195,12 @@ public class KafkaMetadataUtil
* @param partition
* @param whichTime
* @param clientName
- * @return 0 if consumer is null at this time
+ * @return the last offset, value value should be >=0. Return <0 if consumer is null or error.
*/
public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName)
{
if (consumer == null) {
- return 0;
+ return -1;
}
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
@@ -206,11 +210,92 @@ public class KafkaMetadataUtil
if (response.hasError()) {
logger.error("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));
- return 0;
+ return -1;
}
long[] offsets = response.offsets(topic, partition);
return offsets[0];
}
+
+ /**
+ * this method wrapper kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(OffsetRequest)
+ * @param consumer
+ * @param clientName
+ * @param topic
+ * @param partitionId
+ * @param time
+ * @param maxNumOffsets
+ * @return
+ */
+ public static long[] getOffsetsBefore(SimpleConsumer consumer, String clientName, String topic, int partitionId, long time, int maxNumOffsets)
+ {
+ if (consumer == null) {
+ throw new IllegalArgumentException("consumer is not suppose to be null.");
+ }
+
+ TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partitionId);
+ Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
+ requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(time, maxNumOffsets));
+ OffsetRequest request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
+ OffsetResponse response = consumer.getOffsetsBefore(request);
+
+ if (response.hasError()) {
+ logger.error("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partitionId));
+ return null;
+ }
+ return response.offsets(topic, partitionId);
+ }
+
+
+ /**
+ * get the last offset of each partition to the partitionToOffset map
+ * @param clientNamePrefix
+ * @param brokerSet
+ * @param topic
+ * @param time
+ * @param partitionToOffset
+ */
+ public static void getLastOffsetsTo(String clientNamePrefix, Set<String> brokerSet, String topic,
+ Map<Integer, Long> partitionToOffset)
+ {
+ // read last received kafka message
+ TopicMetadata tm = KafkaMetadataUtil.getTopicMetadata(brokerSet, topic);
+
+ if (tm == null) {
+ throw new RuntimeException("Failed to retrieve topic metadata");
+ }
+
+ for (PartitionMetadata pm : tm.partitionsMetadata()) {
+ SimpleConsumer consumer = null;
+ try {
+ int partitionId = pm.partitionId();
+
+ String leadBroker = pm.leader().host();
+ int port = pm.leader().port();
+ final String clientName = getClientName(clientNamePrefix, topic, partitionId);
+ consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName);
+
+ TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partitionId);
+ Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
+ requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), 1));
+ OffsetRequest request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
+ OffsetResponse response = consumer.getOffsetsBefore(request);
+
+ if (response.hasError()) {
+ logger.error("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partitionId));
+ }
+ partitionToOffset.put(partitionId, response.offsets(topic, partitionId)[0]);
+ } finally {
+ if (consumer != null) {
+ consumer.close();
+ }
+ }
+ }
+ }
+
+ public static String getClientName(String clientNamePrefix, String topic, int partitionId)
+ {
+ return clientNamePrefix + "_" + topic + "_" + partitionId;
+ }
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/33a5c2ec/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaUtil.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaUtil.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaUtil.java
new file mode 100644
index 0000000..d49e462
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaUtil.java
@@ -0,0 +1,358 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.kafka;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import com.datatorrent.common.util.Pair;
+
+import kafka.api.FetchRequest;
+import kafka.api.FetchRequestBuilder;
+import kafka.javaapi.FetchResponse;
+import kafka.javaapi.PartitionMetadata;
+import kafka.javaapi.TopicMetadata;
+import kafka.javaapi.consumer.SimpleConsumer;
+import kafka.javaapi.message.ByteBufferMessageSet;
+import kafka.message.Message;
+import kafka.message.MessageAndOffset;
+
+public class KafkaUtil
+{
+ private static Logger logger = LoggerFactory.getLogger(KafkaUtil.class);
+ public static final int DEFAULT_TIMEOUT = 200;
+ public static final int DEFAULT_BUFFER_SIZE = 64 * 10240;
+ public static final int DEFAULT_FETCH_SIZE = 200;
+
+ /**
+ * read last message ( the start offset send from partitionToOffset ) of all
+ * partition to partitionToMessages
+ *
+ * @param clientNamePrefix
+ * @param brokerSet
+ * @param topic
+ * @param partitionToStartOffset
+ * @param partitionToMessages
+ */
+ public static void readMessagesAfterOffsetTo(String clientNamePrefix, Set<String> brokerSet, String topic,
+ Map<Integer, Long> partitionToStartOffset, Map<Integer, List<Pair<byte[], byte[]>>> partitionToMessages)
+ {
+ // read last received kafka message
+ TopicMetadata tm = KafkaMetadataUtil.getTopicMetadata(brokerSet, topic);
+
+ if (tm == null) {
+ throw new RuntimeException("Failed to retrieve topic metadata");
+ }
+
+ for (PartitionMetadata pm : tm.partitionsMetadata()) {
+ SimpleConsumer consumer = null;
+ try {
+ List<Pair<byte[], byte[]>> messagesOfPartition = partitionToMessages.get(pm.partitionId());
+ if (messagesOfPartition == null) {
+ messagesOfPartition = Lists.newArrayList();
+ partitionToMessages.put(pm.partitionId(), messagesOfPartition);
+ }
+
+ long startOffset = partitionToStartOffset.get(pm.partitionId()) == null ? 0
+ : partitionToStartOffset.get(pm.partitionId());
+ final String clientName = KafkaMetadataUtil.getClientName(clientNamePrefix, tm.topic(), pm.partitionId());
+ consumer = createSimpleConsumer(clientName, tm.topic(), pm);
+
+ //the returned lastOffset is the offset which haven't written data to.
+ long lastOffset = KafkaMetadataUtil.getLastOffset(consumer, tm.topic(), pm.partitionId(),
+ kafka.api.OffsetRequest.LatestTime(), clientName);
+ logger.debug("lastOffset = {}", lastOffset);
+ if (lastOffset <= 0) {
+ continue;
+ }
+
+ readMessagesBetween(consumer, clientName, topic, pm.partitionId(), startOffset, lastOffset - 1,
+ messagesOfPartition);
+ } finally {
+ if (consumer != null) {
+ consumer.close();
+ }
+ }
+ }
+ }
+
+ public static void readMessagesBetween(String clientNamePrefix, Set<String> brokerSet, String topic, int partitionId,
+ long startOffset, long endOffset, List<Pair<byte[], byte[]>> messages)
+ {
+ Map<Integer, SimpleConsumer> consumers = createSimpleConsumers(clientNamePrefix, brokerSet, topic);
+ if (consumers == null) {
+ throw new RuntimeException("Can't find any consumer.");
+ }
+
+ SimpleConsumer consumer = consumers.get(partitionId);
+ if (consumer == null) {
+ throw new IllegalArgumentException("No consumer for partition: " + partitionId);
+ }
+
+ readMessagesBetween(consumer, KafkaMetadataUtil.getClientName(clientNamePrefix, topic, partitionId), topic,
+ partitionId, startOffset, endOffset, messages);
+ }
+
+ /**
+ * get A map of partition id to SimpleConsumer
+ *
+ * @param clientNamePrefix
+ * @param brokerSet
+ * @param topic
+ * @return A map of partition id to SimpleConsumer
+ */
+ public static Map<Integer, SimpleConsumer> createSimpleConsumers(String clientNamePrefix, Set<String> brokerSet,
+ String topic)
+ {
+ return createSimpleConsumers(clientNamePrefix, brokerSet, topic, DEFAULT_TIMEOUT);
+ }
+
+ /**
+ * get A map of partition id to SimpleConsumer
+ *
+ * @param clientNamePrefix
+ * @param brokerSet
+ * @param topic
+ * @param timeOut
+ * @return A map of partition id to SimpleConsumer
+ */
+ public static Map<Integer, SimpleConsumer> createSimpleConsumers(String clientNamePrefix, Set<String> brokerSet,
+ String topic, int timeOut)
+ {
+ if (clientNamePrefix == null || clientNamePrefix.isEmpty() || brokerSet == null || brokerSet.isEmpty()
+ || topic == null || topic.isEmpty()) {
+ throw new IllegalArgumentException(
+ "clientNamePrefix = " + clientNamePrefix + ", brokerSet = " + brokerSet + ", topic = " + topic);
+ }
+
+ TopicMetadata tm = KafkaMetadataUtil.getTopicMetadata(brokerSet, topic);
+
+ if (tm == null) {
+ throw new RuntimeException("Failed to retrieve topic metadata");
+ }
+
+ Map<Integer, SimpleConsumer> consumers = Maps.newHashMap();
+ for (PartitionMetadata pm : tm.partitionsMetadata()) {
+ String clientName = KafkaMetadataUtil.getClientName(clientNamePrefix, tm.topic(), pm.partitionId());
+ consumers.put(pm.partitionId(), createSimpleConsumer(clientName, tm.topic(), pm));
+ }
+ return consumers;
+ }
+
+ public static void readMessagesBetween(SimpleConsumer consumer, String clientName, String topic, int partitionId,
+ long startOffset, long endOffset, List<Pair<byte[], byte[]>> messages)
+ {
+ readMessagesBetween(consumer, clientName, topic, partitionId, startOffset, endOffset, messages, 1);
+ }
+
+ /**
+ * read messages of a certain partition into messages
+ *
+ * @param consumer
+ * @param clientNamePrefix
+ * @param topic
+ * @param partitionId
+ * @param startOffset
+ * inclusive
+ * @param endOffset
+ * inclusive
+ * @param messages
+ * @param tryTimesOnEmptyMessage
+ * how many times should to try when response message is empty. <=0
+ * means try forever.
+ */
+ public static void readMessagesBetween(SimpleConsumer consumer, String clientName, String topic, int partitionId,
+ long startOffset, long endOffset, List<Pair<byte[], byte[]>> messages, int tryTimesOnEmptyMessage)
+ {
+ if (startOffset < 0 || endOffset < 0 || endOffset < startOffset) {
+ throw new IllegalArgumentException(
+ "Both offset should not less than zero and endOffset should not less than startOffset. startOffset = "
+ + startOffset + ", endoffset = " + endOffset);
+ }
+
+ int readSize = 0;
+ int wantedSize = (int)(endOffset - startOffset + 1);
+
+ int triedTimesOnEmptyMessage = 0;
+ while (readSize < wantedSize
+ && (tryTimesOnEmptyMessage <= 0 || triedTimesOnEmptyMessage < tryTimesOnEmptyMessage)) {
+ logger.debug("startOffset = {}", startOffset);
+ FetchRequest req = new FetchRequestBuilder().clientId(clientName)
+ .addFetch(topic, partitionId, startOffset, DEFAULT_FETCH_SIZE).build();
+
+ FetchResponse fetchResponse = consumer.fetch(req);
+ if (fetchResponse.hasError()) {
+ logger.error(
+ "Error fetching data Offset Data the Broker. Reason: " + fetchResponse.errorCode(topic, partitionId));
+ return;
+ }
+
+ triedTimesOnEmptyMessage++;
+ ByteBufferMessageSet messageSet = fetchResponse.messageSet(topic, partitionId);
+ for (MessageAndOffset messageAndOffset : messageSet) {
+ long offset = messageAndOffset.offset();
+ logger.debug("offset = " + offset);
+
+ if (offset > endOffset || offset < startOffset) {
+ continue;
+ }
+ triedTimesOnEmptyMessage = 0;
+ startOffset = offset + 1;
+ ++readSize;
+ messages.add(kafkaMessageToPair(messageAndOffset.message()));
+ }
+ }
+ }
+
+ /**
+ * read last message of each partition into lastMessages
+ *
+ * @param clientNamePrefix
+ * @param brokerSet
+ * @param topic
+ * @param lastMessages
+ */
+ public static void readLastMessages(String clientNamePrefix, Set<String> brokerSet, String topic,
+ Map<Integer, Pair<byte[], byte[]>> lastMessages)
+ {
+ // read last received kafka message
+ TopicMetadata tm = KafkaMetadataUtil.getTopicMetadata(brokerSet, topic);
+
+ if (tm == null) {
+ throw new RuntimeException("Failed to retrieve topic metadata");
+ }
+
+ for (PartitionMetadata pm : tm.partitionsMetadata()) {
+ SimpleConsumer consumer = null;
+ try {
+ String clientName = KafkaMetadataUtil.getClientName(clientNamePrefix, tm.topic(), pm.partitionId());
+ consumer = createSimpleConsumer(clientName, tm.topic(), pm);
+
+ long readOffset = KafkaMetadataUtil.getLastOffset(consumer, tm.topic(), pm.partitionId(),
+ kafka.api.OffsetRequest.LatestTime(), clientName);
+
+ FetchRequest req = new FetchRequestBuilder().clientId(clientName)
+ .addFetch(tm.topic(), pm.partitionId(), readOffset - 1, DEFAULT_FETCH_SIZE).build();
+
+ FetchResponse fetchResponse = consumer.fetch(req);
+ if (fetchResponse.hasError()) {
+ logger.error("Error fetching data Offset Data the Broker. Reason: "
+ + fetchResponse.errorCode(topic, pm.partitionId()));
+ return;
+ }
+
+ for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(tm.topic(), pm.partitionId())) {
+ lastMessages.put(pm.partitionId(), kafkaMessageToPair(messageAndOffset.message()));
+ }
+ } finally {
+ if (consumer != null) {
+ consumer.close();
+ }
+ }
+
+ }
+ }
+
+ public static Pair<byte[], byte[]> readLastMessage(String clientNamePrefix, Set<String> brokerSet, String topic,
+ int partitionId)
+ {
+ // read last received kafka message
+ TopicMetadata tm = KafkaMetadataUtil.getTopicMetadata(brokerSet, topic);
+
+ if (tm == null) {
+ throw new RuntimeException("Failed to retrieve topic metadata");
+ }
+
+ for (PartitionMetadata pm : tm.partitionsMetadata()) {
+ SimpleConsumer consumer = null;
+ try {
+ if (pm.partitionId() != partitionId) {
+ continue;
+ }
+
+ String clientName = KafkaMetadataUtil.getClientName(clientNamePrefix, tm.topic(), pm.partitionId());
+ consumer = createSimpleConsumer(clientName, topic, pm);
+
+ long readOffset = KafkaMetadataUtil.getLastOffset(consumer, topic, partitionId,
+ kafka.api.OffsetRequest.LatestTime(), clientName);
+
+ FetchRequest req = new FetchRequestBuilder().clientId(clientName)
+ .addFetch(tm.topic(), pm.partitionId(), readOffset - 1, DEFAULT_FETCH_SIZE).build();
+
+ FetchResponse fetchResponse = consumer.fetch(req);
+ if (fetchResponse.hasError()) {
+ logger.error("Error fetching data Offset Data the Broker. Reason: "
+ + fetchResponse.errorCode(topic, pm.partitionId()));
+ return null;
+ }
+
+ for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partitionId)) {
+ return kafkaMessageToPair(messageAndOffset.message());
+ }
+ } finally {
+ if (consumer != null) {
+ consumer.close();
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * convert Kafka message to pair
+ *
+ * @param m
+ * @return
+ */
+ public static Pair<byte[], byte[]> kafkaMessageToPair(Message m)
+ {
+ ByteBuffer payload = m.payload();
+ ByteBuffer key = m.key();
+ byte[] keyBytes = null;
+ if (key != null) {
+ keyBytes = new byte[key.limit()];
+ key.get(keyBytes);
+ }
+
+ byte[] valueBytes = new byte[payload.limit()];
+ payload.get(valueBytes);
+ return new Pair<byte[], byte[]>(keyBytes, valueBytes);
+ }
+
+ public static SimpleConsumer createSimpleConsumer(String clientName, String topic, PartitionMetadata pm)
+ {
+ return createSimpleConsumer(clientName, topic, pm, DEFAULT_TIMEOUT, DEFAULT_BUFFER_SIZE);
+ }
+
+ public static SimpleConsumer createSimpleConsumer(String clientName, String topic, PartitionMetadata pm, int timeout,
+ int bufferSize)
+ {
+ String leadBroker = pm.leader().host();
+ int port = pm.leader().port();
+ return new SimpleConsumer(leadBroker, port, timeout, bufferSize, clientName);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/33a5c2ec/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaTupleUniqueExactlyOnceOutputOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaTupleUniqueExactlyOnceOutputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaTupleUniqueExactlyOnceOutputOperatorTest.java
new file mode 100644
index 0000000..abdcb01
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaTupleUniqueExactlyOnceOutputOperatorTest.java
@@ -0,0 +1,512 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.kafka;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.common.util.Pair;
+import com.datatorrent.stram.api.OperatorDeployInfo;
+
+import kafka.producer.ProducerConfig;
+import kafka.serializer.StringDecoder;
+
+public class KafkaTupleUniqueExactlyOnceOutputOperatorTest extends KafkaOperatorTestBase
+{
+ public static final int TUPLE_NUM_IN_ONE_WINDOW = 10;
+ public static final String topic1 = "OperatorTest1";
+ public static final String controlTopic1 = "ControlTopic1";
+
+ public static final String topic2 = "OperatorTest2";
+ public static final String controlTopic2 = "ControlTopic2";
+
+ public static final String topic3 = "OperatorTest3";
+ public static final String controlTopic3 = "ControlTopic3";
+
+ public static class TupleUniqueExactlyOnceKafkaOutputTestOperator
+ extends AbstractTupleUniqueExactlyOnceKafkaOutputOperator<Integer, String, String>
+ {
+ protected transient StringDecoder decoder = null;
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ decoder = new StringDecoder(null);
+ super.setup(context);
+ }
+
+ @Override
+ protected Pair<String, String> tupleToKeyValue(Integer tuple)
+ {
+ return new Pair<>(String.valueOf(tuple % 2), String.valueOf(tuple));
+ }
+
+ @Override
+ protected <T> boolean equals(byte[] bytes, T value)
+ {
+ if (bytes == null && value == null) {
+ return true;
+ }
+ if (value == null) {
+ return false;
+ }
+ return value.equals(decoder.fromBytes(bytes));
+ }
+
+ }
+
+ protected void createTopic(String topicName)
+ {
+ createTopic(0, topicName);
+ if (hasMultiCluster) {
+ createTopic(1, topicName);
+ }
+ }
+
+ protected ProducerConfig createKafkaControlProducerConfig()
+ {
+ return new ProducerConfig(this.getKafkaProperties());
+ }
+
+ /**
+ * This test case there are only one operator partition, and the order of data
+ * changed when recovery.
+ */
+ @Test
+ public void testOutOfOrder()
+ {
+ OperatorDeployInfo context = new OperatorDeployInfo();
+ context.id = 1;
+ int[] expectedTuple = new int[(int)(TUPLE_NUM_IN_ONE_WINDOW * 3)];
+ int tupleIndex = 0;
+ long windowId = 0;
+ {
+ //create required topics
+ createTopic(topic1);
+ createTopic(controlTopic1);
+
+ TupleUniqueExactlyOnceKafkaOutputTestOperator operator = createOperator(topic1, controlTopic1, 1);
+
+ int i = 0;
+ for (int windowCount = 0; windowCount < 2; ++windowCount) {
+ operator.beginWindow(windowId++);
+
+ for (; i < TUPLE_NUM_IN_ONE_WINDOW * (windowCount + 1); ++i) {
+ operator.processTuple(i);
+ expectedTuple[tupleIndex++] = i;
+ }
+ waitMills(500);
+ operator.endWindow();
+ }
+
+ //last window, the crash window
+ operator.beginWindow(windowId++);
+ for (; i < TUPLE_NUM_IN_ONE_WINDOW * 2.5; ++i) {
+ operator.processTuple(i);
+ expectedTuple[tupleIndex++] = i;
+ }
+
+ //crashed now.
+ }
+
+ //let kafka message send to server
+ waitMills(1000);
+
+ {
+ //recovery
+ TupleUniqueExactlyOnceKafkaOutputTestOperator operator = new TupleUniqueExactlyOnceKafkaOutputTestOperator();
+ operator.setTopic(topic1);
+ operator.setControlTopic(controlTopic1);
+ operator.setConfigProperties(getKafkaProperties());
+
+ operator.setup(context);
+
+ //assume replay start with 2nd window, but different order
+ int i = TUPLE_NUM_IN_ONE_WINDOW;
+
+ windowId = 1;
+ operator.beginWindow(windowId++);
+ for (; i < TUPLE_NUM_IN_ONE_WINDOW * 2; i += 2) {
+ operator.processTuple(i);
+ }
+ i = TUPLE_NUM_IN_ONE_WINDOW + 1;
+ for (; i < TUPLE_NUM_IN_ONE_WINDOW * 2; i += 2) {
+ operator.processTuple(i);
+ }
+ waitMills(500);
+ operator.endWindow();
+
+ //3rd window, in different order
+ operator.beginWindow(windowId++);
+ i = TUPLE_NUM_IN_ONE_WINDOW * 2;
+ for (; i < TUPLE_NUM_IN_ONE_WINDOW * 3; i += 2) {
+ operator.processTuple(i);
+ if (i >= TUPLE_NUM_IN_ONE_WINDOW * 2.5) {
+ expectedTuple[tupleIndex++] = i;
+ }
+ }
+
+ i = TUPLE_NUM_IN_ONE_WINDOW * 2 + 1;
+ for (; i < TUPLE_NUM_IN_ONE_WINDOW * 3; i += 2) {
+ operator.processTuple(i);
+ if (i >= TUPLE_NUM_IN_ONE_WINDOW * 2.5) {
+ expectedTuple[tupleIndex++] = i;
+ }
+ }
+ }
+
+ int[] actualTuples = readTuplesFromKafka(topic1);
+ Assert.assertArrayEquals(expectedTuple, actualTuples);
+ }
+
+ protected TupleUniqueExactlyOnceKafkaOutputTestOperator createOperator(String topic, String controlTopic, int id)
+ {
+ TupleUniqueExactlyOnceKafkaOutputTestOperator operator = new TupleUniqueExactlyOnceKafkaOutputTestOperator();
+ operator.setTopic(topic);
+ operator.setControlTopic(controlTopic);
+
+ operator.setConfigProperties(getKafkaProperties());
+ OperatorDeployInfo context = new OperatorDeployInfo();
+ context.id = id;
+ operator.setup(context);
+
+ return operator;
+ }
+
+ /**
+ * This test case test the case the tuple go to other operator partition when
+ * recovery.
+ */
+ @Test
+ public void testDifferentPartition()
+ {
+ //hasMultiPartition = true;
+
+ int[] expectedTuples = new int[(int)(TUPLE_NUM_IN_ONE_WINDOW * 6)];
+ int tupleIndex = 0;
+ long windowId1 = 0;
+ long windowId2 = 0;
+
+ //create required topics
+ createTopic(topic2);
+ createTopic(controlTopic2);
+
+ {
+ TupleUniqueExactlyOnceKafkaOutputTestOperator operator1 = createOperator(topic2, controlTopic2, 1);
+ TupleUniqueExactlyOnceKafkaOutputTestOperator operator2 = createOperator(topic2, controlTopic2, 2);
+ TupleUniqueExactlyOnceKafkaOutputTestOperator[] operators = new TupleUniqueExactlyOnceKafkaOutputTestOperator[] {
+ operator1, operator2 };
+
+ //send as round robin
+ int i = 0;
+ for (int windowCount = 0; windowCount < 2; ++windowCount) {
+ operator1.beginWindow(windowId1++);
+ operator2.beginWindow(windowId2++);
+
+ for (; i < TUPLE_NUM_IN_ONE_WINDOW * (windowCount + 1) * 2; ++i) {
+ operators[i % 2].processTuple(i);
+ expectedTuples[tupleIndex++] = i;
+ }
+ waitMills(500);
+ operator1.endWindow();
+ operator2.endWindow();
+ }
+
+ //last window, the crash window
+ operator1.beginWindow(windowId1++);
+ operator2.beginWindow(windowId2++);
+ for (; i < TUPLE_NUM_IN_ONE_WINDOW * 2.5 * 2; ++i) {
+ operators[i % 2].processTuple(i);
+ expectedTuples[tupleIndex++] = i;
+ }
+
+ //crashed now.
+ }
+
+ //let kafka message send to server
+ waitMills(1000);
+ int lastTuple = tupleIndex - 1;
+ {
+ //recovery
+ TupleUniqueExactlyOnceKafkaOutputTestOperator operator1 = createOperator(topic2, controlTopic2, 1);
+ TupleUniqueExactlyOnceKafkaOutputTestOperator operator2 = createOperator(topic2, controlTopic2, 2);
+ //tuple go to different partition
+ TupleUniqueExactlyOnceKafkaOutputTestOperator[] operators = new TupleUniqueExactlyOnceKafkaOutputTestOperator[] {
+ operator2, operator1 };
+
+ //assume replay start with 2nd window, but different order
+ int i = TUPLE_NUM_IN_ONE_WINDOW * 2;
+
+ windowId1 = 1;
+ windowId2 = 1;
+
+ //window id: 1, 2
+ for (int windowCount = 0; windowCount < 2; ++windowCount) {
+ operator1.beginWindow(windowId1++);
+ operator2.beginWindow(windowId2++);
+
+ for (; i < TUPLE_NUM_IN_ONE_WINDOW * (windowCount + 2) * 2; ++i) {
+ operators[i % 2].processTuple(i);
+ if (i > lastTuple) {
+ expectedTuples[tupleIndex++] = i;
+ }
+ }
+ waitMills(500);
+ operator1.endWindow();
+ operator2.endWindow();
+ }
+ }
+
+ int[] actualTuples = readTuplesFromKafka(topic2);
+ Arrays.sort(actualTuples);
+ Arrays.sort(expectedTuples);
+
+ assertArrayEqualsWithDetailInfo(expectedTuples, actualTuples);
+ }
+
+ /**
+ * This test case test only one operator partition crash, while the other
+ * operator partition keep on write data to the same Kafka partition.
+ */
+ @Test
+ public void testOnePartitionCrash()
+ {
+
+ int[] expectedTuples = new int[(int)(TUPLE_NUM_IN_ONE_WINDOW * 6)];
+ int tupleIndex = 0;
+ long windowId1 = 0;
+ long windowId2 = 0;
+
+ //create required topics
+ createTopic(topic3);
+ createTopic(controlTopic3);
+
+ {
+ TupleUniqueExactlyOnceKafkaOutputTestOperator operator1 = createOperator(topic3, controlTopic3, 1);
+ TupleUniqueExactlyOnceKafkaOutputTestOperator operator2 = createOperator(topic3, controlTopic3, 2);
+ TupleUniqueExactlyOnceKafkaOutputTestOperator[] operators = new TupleUniqueExactlyOnceKafkaOutputTestOperator[] {
+ operator1, operator2 };
+
+ //send as round robin
+ int i = 0;
+ for (int windowCount = 0; windowCount < 2; ++windowCount) {
+ operator1.beginWindow(windowId1++);
+ operator2.beginWindow(windowId2++);
+
+ for (; i < TUPLE_NUM_IN_ONE_WINDOW * (windowCount + 1) * 2; ++i) {
+ operators[i % 2].processTuple(i);
+ expectedTuples[tupleIndex++] = i;
+ }
+ waitMills(500);
+ operator1.endWindow();
+ operator2.endWindow();
+ }
+
+ //operator1 crash, while operator2 alive
+ operator1.beginWindow(windowId1++);
+ //operator1 handle even number;
+ for (; i < TUPLE_NUM_IN_ONE_WINDOW * 2.5 * 2; i += 2) {
+ operators[i % 2].processTuple(i);
+ expectedTuples[tupleIndex++] = i;
+ }
+
+ //operator1 crashed now.
+
+ //operator2 still alive, operator2 handle odd number
+ operator2.beginWindow(windowId2++);
+ i = TUPLE_NUM_IN_ONE_WINDOW * 4 + 1;
+ for (; i < TUPLE_NUM_IN_ONE_WINDOW * 3 * 2; i += 2) {
+ operator2.processTuple(i);
+ expectedTuples[tupleIndex++] = i;
+ }
+ operator2.endWindow();
+
+ }
+
+ //let kafka message send to server
+ waitMills(1000);
+
+ //operator1 recover from second window
+ int lastTuple = (int)(TUPLE_NUM_IN_ONE_WINDOW * 2.5 * 2) - 1;
+ {
+ //recovery
+ TupleUniqueExactlyOnceKafkaOutputTestOperator operator1 = createOperator(topic3, controlTopic3, 1);
+
+ //assume replay start with 2nd window, same order
+ int i = TUPLE_NUM_IN_ONE_WINDOW * 2;
+
+ windowId1 = 1;
+
+ //window id: 1, 2
+ for (int windowCount = 0; windowCount < 2; ++windowCount) {
+ operator1.beginWindow(windowId1++);
+
+ for (; i < TUPLE_NUM_IN_ONE_WINDOW * (windowCount + 2) * 2; i += 2) {
+ operator1.processTuple(i);
+ if (i > lastTuple) {
+ expectedTuples[tupleIndex++] = i;
+ }
+ }
+ waitMills(500);
+ operator1.endWindow();
+ }
+ }
+
+ int[] actualTuples = readTuplesFromKafka(topic3);
+ Arrays.sort(actualTuples);
+ Arrays.sort(expectedTuples);
+
+ assertArrayEqualsWithDetailInfo(expectedTuples, actualTuples);
+ }
+
+ /**
+ * Test the application which using TupleUniqueExactlyOnceKafkaOutputTestOperator is launchalbe in local mode
+ */
+ @Test
+ public void testLaunchApp() throws Exception
+ {
+ Configuration conf = new Configuration(false);
+ LocalMode lma = LocalMode.newInstance();
+ DAG dag = lma.getDAG();
+
+ TupleGenerateOperator generateOperator = new TupleGenerateOperator();
+ dag.addOperator("GenerateOperator", generateOperator);
+
+ TupleUniqueExactlyOnceKafkaOutputTestOperator testOperator = new TupleUniqueExactlyOnceKafkaOutputTestOperator();
+ dag.addOperator("TestOperator", testOperator);
+
+ dag.addStream("stream", generateOperator.outputPort, testOperator.inputPort);
+
+ StreamingApplication app = new StreamingApplication()
+ {
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ }
+ };
+
+ lma.prepareDAG(app, conf);
+
+ // Create local cluster
+ final LocalMode.Controller lc = lma.getController();
+ lc.run(5000);
+
+ lc.shutdown();
+ }
+
+ public static void assertArrayEqualsWithDetailInfo(int[] expectedTuples, int[] actualTuples)
+ {
+ Assert.assertTrue("Length incorrect. expected " + expectedTuples.length + "; actual " + actualTuples.length,
+ actualTuples.length == expectedTuples.length);
+ for (int i = 0; i < actualTuples.length; ++i) {
+ Assert.assertEquals("Not equal. index=" + i + ", expected=" + expectedTuples[i] + ", actual=" + actualTuples[i],
+ actualTuples[i], expectedTuples[i]);
+ }
+ }
+
+ public void waitMills(long millis)
+ {
+ try {
+ Thread.sleep(millis);
+ } catch (Exception e) {
+ //ignore
+ }
+ }
+
+ public int[] readTuplesFromKafka(String topic)
+ {
+ StringDecoder decoder = new StringDecoder(null);
+ Map<Integer, Long> partitionToStartOffset = Maps.newHashMap();
+ partitionToStartOffset.put(0, 0L);
+
+ this.waitMills(1000);
+
+ Map<Integer, List<Pair<byte[], byte[]>>> partitionToMessages = Maps.newHashMap();
+ KafkaUtil.readMessagesAfterOffsetTo("TestOperator", getBrokerSet(), topic, partitionToStartOffset,
+ partitionToMessages);
+
+ List<Pair<byte[], byte[]>> msgList = partitionToMessages.get(0);
+ int[] values = new int[msgList.size()];
+ int index = 0;
+ for (Pair<byte[], byte[]> msg : msgList) {
+ values[index++] = Integer.valueOf(decoder.fromBytes(msg.second));
+ }
+ return values;
+ }
+
+
+ protected Set<String> getBrokerSet()
+ {
+ return Sets.newHashSet((String)getKafkaProperties().get(KafkaMetadataUtil.PRODUCER_PROP_BROKERLIST));
+ }
+
+ public Properties getKafkaProperties()
+ {
+ Properties props = new Properties();
+ props.setProperty("serializer.class", "kafka.serializer.StringEncoder");
+ //props.setProperty("serializer.class", "kafka.serializer.DefaultEncoder");
+ //props.setProperty("key.serializer.class", "kafka.serializer.StringEncoder");
+ props.put("metadata.broker.list", "localhost:9092");
+ //props.setProperty("producer.type", "sync");
+ props.setProperty("producer.type", "async");
+ props.setProperty("queue.buffering.max.ms", "100");
+ props.setProperty("queue.buffering.max.messages", "5");
+ props.setProperty("batch.num.messages", "5");
+ return props;
+ }
+
+
+ public static class TupleGenerateOperator extends BaseOperator implements InputOperator
+ {
+ @OutputPortFieldAnnotation(optional = true)
+ public final transient DefaultOutputPort<Integer> outputPort = new DefaultOutputPort<>();
+ protected int value = 0;
+
+ @Override
+ public void emitTuples()
+ {
+ if (!outputPort.isConnected()) {
+ return;
+ }
+
+ for (int i = 0; i < 100; ++i) {
+ outputPort.emit(++value);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/33a5c2ec/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaUtilTester.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaUtilTester.java b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaUtilTester.java
new file mode 100644
index 0000000..c27803d
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaUtilTester.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.kafka;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import com.datatorrent.common.util.Pair;
+
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+
+public class KafkaUtilTester extends KafkaOperatorTestBase
+{
+ public static final String topic = "UtilTestTopic";
+ public static final String clientNamePrefix = "UtilTestClient";
+ public static final int DATA_SIZE = 50;
+
+ protected Producer<String, String> producer;
+ private transient Set<String> brokerSet;
+
+ public void beforeTest()
+ {
+ //Got exception when using multiple partition.
+ //java.io.FileNotFoundException: target/kafka-server-data/1/1/replication-offset-checkpoint (No such file or directory)
+ //hasMultiPartition = true;
+
+ super.beforeTest();
+ createTopic(topic);
+
+ producer = new Producer<String, String>(createKafkaProducerConfig());
+ getBrokerSet();
+
+ sendData();
+ }
+
+ public void sendData()
+ {
+ for (int i = 0; i < DATA_SIZE; ++i) {
+ producer.send(new KeyedMessage<String, String>(topic, null, "message " + i));
+ }
+
+ waitMills(1000);
+ }
+
+ @Test
+ public void testReadMessagesAfterOffsetTo()
+ {
+ Map<Integer, Long> partitionToStartOffset = Maps.newHashMap();
+ partitionToStartOffset.put(1, 0L);
+ Map<Integer, List<Pair<byte[], byte[]>>> partitionToMessages = Maps.newHashMap();
+ KafkaUtil.readMessagesAfterOffsetTo(clientNamePrefix, brokerSet, topic, partitionToStartOffset,
+ partitionToMessages);
+ final int dataSize = partitionToMessages.entrySet().iterator().next().getValue().size();
+ Assert.assertTrue("data size is: " + dataSize, dataSize == DATA_SIZE);
+ }
+
+ public void waitMills(long millis)
+ {
+ try {
+ Thread.sleep(millis);
+ } catch (Exception e) {
+ //ignore
+ }
+ }
+
+ protected void createTopic(String topicName)
+ {
+ createTopic(0, topicName);
+ if (hasMultiCluster) {
+ createTopic(1, topicName);
+ }
+ }
+
+ protected Properties getConfigProperties()
+ {
+ Properties props = new Properties();
+ props.setProperty("serializer.class", "kafka.serializer.StringEncoder");
+ //props.setProperty("serializer.class", "kafka.serializer.DefaultEncoder");
+ //props.setProperty("key.serializer.class", "kafka.serializer.StringEncoder");
+ props.put("metadata.broker.list", "localhost:9092");
+ //props.setProperty("producer.type", "sync");
+ props.setProperty("producer.type", "async");
+ props.setProperty("queue.buffering.max.ms", "10");
+ props.setProperty("queue.buffering.max.messages", "10");
+ props.setProperty("batch.num.messages", "5");
+
+ return props;
+ }
+
+ protected ProducerConfig createKafkaProducerConfig()
+ {
+ return new ProducerConfig(getConfigProperties());
+ }
+
+ protected Set<String> getBrokerSet()
+ {
+ if (brokerSet == null) {
+ brokerSet = Sets.newHashSet((String)getConfigProperties().get(KafkaMetadataUtil.PRODUCER_PROP_BROKERLIST));
+ }
+ return brokerSet;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/33a5c2ec/contrib/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/contrib/src/test/resources/log4j.properties b/contrib/src/test/resources/log4j.properties
index 2fcbe38..cfc50cf 100644
--- a/contrib/src/test/resources/log4j.properties
+++ b/contrib/src/test/resources/log4j.properties
@@ -39,3 +39,4 @@ log4j.logger.org=info
#log4j.logger.org.apache.commons.beanutils=warn
log4j.logger.com.datatorrent=debug
log4j.logger.org.apache.apex=debug
+log4j.logger.kafka=info
[5/5] apex-malhar git commit: Merge branch 'APEXMALHAR-2133' of
github.com:brightchen/incubator-apex-malhar
Posted by hs...@apache.org.
Merge branch 'APEXMALHAR-2133' of github.com:brightchen/incubator-apex-malhar
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/cc9d5036
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/cc9d5036
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/cc9d5036
Branch: refs/heads/master
Commit: cc9d503665983b4a214ad0a39f64a6caa448c01b
Parents: 09465e4 273b207
Author: Siyuan Hua <hs...@apache.org>
Authored: Thu Jul 7 10:00:34 2016 -0700
Committer: Siyuan Hua <hs...@apache.org>
Committed: Thu Jul 7 10:00:34 2016 -0700
----------------------------------------------------------------------
.../malhar/kafka/AbstractKafkaPartitioner.java | 60 +++++++++++++-------
1 file changed, 38 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
[2/5] apex-malhar git commit: Merge branch 'APEXMALHAR-2076' of
github.com:brightchen/incubator-apex-malhar
Posted by hs...@apache.org.
Merge branch 'APEXMALHAR-2076' of github.com:brightchen/incubator-apex-malhar
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/c887d885
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/c887d885
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/c887d885
Branch: refs/heads/master
Commit: c887d885f5b3272666c6be0c9d5aea30a4144541
Parents: 389a2d5 33a5c2e
Author: Siyuan Hua <hs...@apache.org>
Authored: Wed Jun 29 09:05:49 2016 -0700
Committer: Siyuan Hua <hs...@apache.org>
Committed: Wed Jun 29 09:05:49 2016 -0700
----------------------------------------------------------------------
.../kafka/AbstractKafkaOutputOperator.java | 2 +-
...pleUniqueExactlyOnceKafkaOutputOperator.java | 610 +++++++++++++++++++
.../contrib/kafka/KafkaMetadataUtil.java | 121 +++-
.../datatorrent/contrib/kafka/KafkaUtil.java | 358 +++++++++++
...upleUniqueExactlyOnceOutputOperatorTest.java | 512 ++++++++++++++++
.../contrib/kafka/KafkaUtilTester.java | 128 ++++
contrib/src/test/resources/log4j.properties | 1 +
7 files changed, 1713 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c887d885/contrib/src/test/resources/log4j.properties
----------------------------------------------------------------------
[4/5] apex-malhar git commit: APEXMALHAR-2133 #resolve #comment
Handle case partitionsFor() returns null
Posted by hs...@apache.org.
APEXMALHAR-2133 #resolve #comment Handle case partitionsFor() returns null
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/273b2072
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/273b2072
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/273b2072
Branch: refs/heads/master
Commit: 273b20724a796786d52235e6535303f86b35800f
Parents: 32840a2
Author: brightchen <br...@datatorrent.com>
Authored: Tue Jul 5 13:39:06 2016 -0700
Committer: brightchen <br...@datatorrent.com>
Committed: Wed Jul 6 16:50:48 2016 -0700
----------------------------------------------------------------------
.../malhar/kafka/AbstractKafkaPartitioner.java | 60 +++++++++++++-------
1 file changed, 38 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/273b2072/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
index c6e47e9..772399d 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
@@ -93,39 +93,46 @@ public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKa
Map<String, Map<String, List<PartitionInfo>>> metadata = new HashMap<>();
- for (int i = 0; i < clusters.length; i++) {
- metadata.put(clusters[i], new HashMap<String, List<PartitionInfo>>());
- for (String topic : topics) {
- int tryTime = 3;
- while (tryTime-- > 0) {
- try {
- List<PartitionInfo> ptis = metadataRefreshClients.get(i).partitionsFor(topic);
- if (logger.isDebugEnabled()) {
- logger.debug("Partition metadata for topic {} : {}", topic, Joiner.on(';').join(ptis));
+ try {
+ for (int i = 0; i < clusters.length; i++) {
+ metadata.put(clusters[i], new HashMap<String, List<PartitionInfo>>());
+ for (String topic : topics) {
+ //try several time if partitionsFor(topic) returns null or throws exception.
+ //partitionsFor(topic) will return null if the topic is invalid or hasn't completed
+ int tryTime = 10;
+ while (tryTime-- > 0) {
+ try {
+ List<PartitionInfo> ptis = metadataRefreshClients.get(i).partitionsFor(topic);
+ if (ptis != null) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Partition metadata for topic {} : {}", topic, Joiner.on(';').join(ptis));
+ }
+ metadata.get(clusters[i]).put(topic, ptis);
+ break;
+ }
+
+ logger.warn("Partition metadata for topic {} is null. retrying...", topic);
+
+ } catch (Exception e) {
+ logger.warn("Got Exception when trying get partition info for topic {}.", topic, e);
}
- metadata.get(clusters[i]).put(topic, ptis);
- break;
- } catch (AuthorizationException ae) {
- logger.error("Kafka AuthorizationException.");
- throw new RuntimeException("Kafka AuthorizationException.", ae);
- } catch (Exception e) {
- logger.warn("Got Exception when trying get partition info for topic {}.", topic, e);
+
try {
Thread.sleep(100);
} catch (Exception e1) {
//ignore
}
+ } //end while
+
+ if (tryTime == 0) {
+ throw new RuntimeException("Get partition info for topic completely failed. Please check the log file. topic name: " + topic);
}
}
- if (tryTime == 0) {
- throw new RuntimeException("Get partition info completely failed. Please check the log file");
- }
}
- metadataRefreshClients.get(i).close();
+ } finally {
+ closeClients();
}
- metadataRefreshClients = null;
-
List<Set<AbstractKafkaPartitioner.PartitionMeta>> parts = null;
try {
parts = assign(metadata);
@@ -169,6 +176,15 @@ public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKa
}
}
+ protected void closeClients()
+ {
+ for (KafkaConsumer<byte[], byte[]> consume : metadataRefreshClients) {
+ consume.close();
+ }
+ metadataRefreshClients = null;
+ }
+
+
@Override
public void partitioned(Map<Integer, Partition<AbstractKafkaInputOperator>> map)
{
[3/5] apex-malhar git commit: Merge branch 'master' of
github.com:apache/incubator-apex-malhar
Posted by hs...@apache.org.
Merge branch 'master' of github.com:apache/incubator-apex-malhar
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/09465e43
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/09465e43
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/09465e43
Branch: refs/heads/master
Commit: 09465e43cffc149d434d4e35a1e2d72dbc8e89ed
Parents: c887d88 c4a1129
Author: Siyuan Hua <hs...@apache.org>
Authored: Wed Jul 6 15:40:09 2016 -0700
Committer: Siyuan Hua <hs...@apache.org>
Committed: Wed Jul 6 15:40:09 2016 -0700
----------------------------------------------------------------------
.../cassandra/CassandraOutputOperator.java | 4 +-
...tCassandraTransactionableOutputOperator.java | 35 +-
...assandraTransactionableOutputOperatorPS.java | 84 -----
.../cassandra/CassandraPOJOOutputOperator.java | 150 ++++++--
.../cassandra/CassandraOperatorTest.java | 134 +++++--
library/pom.xml | 7 +-
.../bandwidth/BandwidthLimitingOperator.java | 29 ++
.../lib/bandwidth/BandwidthManager.java | 131 +++++++
.../lib/bandwidth/BandwidthPartitioner.java | 78 ++++
.../db/jdbc/AbstractJdbcPOJOOutputOperator.java | 286 +++++++++++++++
...stractJdbcTransactionableOutputOperator.java | 90 ++++-
.../datatorrent/lib/db/jdbc/JdbcFieldInfo.java | 58 +++
.../db/jdbc/JdbcPOJOInsertOutputOperator.java | 183 ++++++++++
.../jdbc/JdbcPOJONonInsertOutputOperator.java | 76 ++++
.../lib/db/jdbc/JdbcPOJOOutputOperator.java | 352 -------------------
.../lib/io/fs/FileSplitterInput.java | 31 +-
.../lib/bandwidth/BandwidthManagerTest.java | 268 ++++++++++++++
.../lib/bandwidth/BandwidthPartitionerTest.java | 106 ++++++
.../com/datatorrent/lib/db/jdbc/JdbcIOApp.java | 15 +-
.../lib/db/jdbc/JdbcOperatorTest.java | 286 ++++++++++++++-
.../lib/io/fs/FileSplitterInputTest.java | 10 +-
21 files changed, 1881 insertions(+), 532 deletions(-)
----------------------------------------------------------------------