You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/01/22 19:08:00 UTC
apex-malhar git commit: fixing all checkstyle violations,
deleting maxAllowedViolations config in pom
Repository: apex-malhar
Updated Branches:
refs/heads/master 4cbbb7507 -> 623b803f5
fixing all checkstyle violations,
deleting maxAllowedViolations config in pom
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/623b803f
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/623b803f
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/623b803f
Branch: refs/heads/master
Commit: 623b803f5b960b3e33054e2b60e8c156b7b0150a
Parents: 4cbbb75
Author: Apex Dev <de...@apex.apache.org>
Authored: Wed Jan 18 17:14:32 2017 -0800
Committer: Oliver W <ol...@datatorrent.com>
Committed: Fri Jan 20 11:48:58 2017 -0800
----------------------------------------------------------------------
kafka/pom.xml | 8 ---
.../kafka/AbstractKafkaInputOperator.java | 48 +++++++--------
.../kafka/AbstractKafkaOutputOperator.java | 4 +-
.../malhar/kafka/AbstractKafkaPartitioner.java | 32 +++++-----
.../apex/malhar/kafka/KafkaConsumerWrapper.java | 13 ++---
.../apex/malhar/kafka/KafkaPartition.java | 4 --
...afkaSinglePortExactlyOnceOutputOperator.java | 61 ++++++++++----------
.../kafka/KafkaSinglePortOutputOperator.java | 11 ++--
.../apex/malhar/kafka/OneToManyPartitioner.java | 6 +-
.../apex/malhar/kafka/OneToOnePartitioner.java | 3 +-
.../apex/malhar/kafka/PartitionStrategy.java | 3 +-
.../apache/apex/malhar/kafka/EmbeddedKafka.java | 7 ++-
.../kafka/KafkaConsumerPropertiesTest.java | 10 ++--
.../apache/apex/malhar/kafka/KafkaHelper.java | 3 +-
.../malhar/kafka/KafkaInputOperatorTest.java | 50 ++++++++--------
.../malhar/kafka/KafkaOperatorTestBase.java | 25 ++++----
.../malhar/kafka/KafkaOutputOperatorTest.java | 35 ++++++-----
.../apex/malhar/kafka/KafkaTestPartitioner.java | 8 ++-
.../apex/malhar/kafka/KafkaTestProducer.java | 20 ++++---
19 files changed, 182 insertions(+), 169 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/623b803f/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/kafka/pom.xml b/kafka/pom.xml
index 590fa09..654c62c 100755
--- a/kafka/pom.xml
+++ b/kafka/pom.xml
@@ -177,14 +177,6 @@
</execution>
</executions>
</plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-checkstyle-plugin</artifactId>
- <configuration>
- <maxAllowedViolations>35</maxAllowedViolations>
- <logViolationsToConsole>${checkstyle.console}</logViolationsToConsole>
- </configuration>
- </plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/623b803f/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
index 6fc7693..1d05580 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
@@ -65,7 +65,7 @@ import com.datatorrent.netlet.util.DTThrowable;
*
* <ol>
* <li>Out-of-box One-to-one and one-to-many partition strategy support plus customizable partition strategy
- * refer to AbstractKafkaPartitioner </li>
+ * refer to AbstractKafkaPartitioner </li>
* <li>Fault-tolerant when the input operator goes down, it redeploys on other node</li>
* <li>At-least-once semantics for operator failure (no matter which operator fails)</li>
* <li>At-least-once semantics for cold restart (no data loss even if you restart the application)</li>
@@ -77,7 +77,9 @@ import com.datatorrent.netlet.util.DTThrowable;
* @since 3.3.0
*/
@InterfaceStability.Evolving
-public abstract class AbstractKafkaInputOperator implements InputOperator, Operator.ActivationListener<Context.OperatorContext>, Operator.CheckpointNotificationListener, Partitioner<AbstractKafkaInputOperator>, StatsListener, OffsetCommitCallback
+public abstract class AbstractKafkaInputOperator implements InputOperator,
+ Operator.ActivationListener<Context.OperatorContext>, Operator.CheckpointNotificationListener,
+ Partitioner<AbstractKafkaInputOperator>, StatsListener, OffsetCommitCallback
{
private static final Logger logger = LoggerFactory.getLogger(AbstractKafkaInputOperator.class);
@@ -92,7 +94,8 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera
{
EARLIEST, // consume from beginning of the partition every time when application restart
LATEST, // consume from latest of the partition every time when application restart
- APPLICATION_OR_EARLIEST, // consume from committed position from last run or earliest if there is no committed offset(s)
+ // consume from committed position from last run or earliest if there is no committed offset(s)
+ APPLICATION_OR_EARLIEST,
APPLICATION_OR_LATEST // consume from committed position from last run or latest if there is no committed offset(s)
}
@@ -103,7 +106,7 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera
private String[] topics;
/**
- * offset track for checkpoint
+ * offset track for checkpoint
*/
private final Map<AbstractKafkaPartitioner.PartitionMeta, Long> offsetTrack = new HashMap<>();
@@ -148,6 +151,7 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera
/**
* By default the strategy is one to one
+ *
* @see PartitionStrategy
*/
private PartitionStrategy strategy = PartitionStrategy.ONE_TO_ONE;
@@ -161,7 +165,8 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera
/**
* store offsets with window id, only keep offsets with windows that have not been committed
*/
- private final transient List<Pair<Long, Map<AbstractKafkaPartitioner.PartitionMeta, Long>>> offsetHistory = new LinkedList<>();
+ private final transient List<Pair<Long, Map<AbstractKafkaPartitioner.PartitionMeta, Long>>> offsetHistory =
+ new LinkedList<>();
/**
* Application name is used as group.id for kafka consumer
@@ -212,7 +217,8 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera
return;
}
//ask kafka consumer wrapper to store the committed offsets
- for (Iterator<Pair<Long, Map<AbstractKafkaPartitioner.PartitionMeta, Long>>> iter = offsetHistory.iterator(); iter.hasNext(); ) {
+ for (Iterator<Pair<Long, Map<AbstractKafkaPartitioner.PartitionMeta, Long>>> iter =
+ offsetHistory.iterator(); iter.hasNext(); ) {
Pair<Long, Map<AbstractKafkaPartitioner.PartitionMeta, Long>> item = iter.next();
if (item.getLeft() <= windowId) {
if (item.getLeft() == windowId) {
@@ -302,8 +308,6 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera
}
}
-
-
@Override
public void setup(Context.OperatorContext context)
{
@@ -314,7 +318,6 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera
operatorId = context.getId();
}
-
@Override
public void teardown()
{
@@ -382,8 +385,8 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera
}
/**
- *
* A callback from consumer after it commits the offset
+ *
* @param map
* @param e
*/
@@ -435,9 +438,9 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera
}
/**
- * Same setting as bootstrap.servers property to KafkaConsumer
- * refer to http://kafka.apache.org/documentation.html#newconsumerconfigs
- * To support multi cluster, you can have multiple bootstrap.servers separated by ";"
+ * Same setting as bootstrap.servers property to KafkaConsumer
+ * refer to http://kafka.apache.org/documentation.html#newconsumerconfigs
+ * To support multi cluster, you can have multiple bootstrap.servers separated by ";"
*/
public String getClusters()
{
@@ -474,13 +477,13 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera
}
/**
- * Initial offset, it should be one of the following
- * <ul>
- * <li>earliest</li>
- * <li>latest</li>
- * <li>application_or_earliest</li>
- * <li>application_or_latest</li>
- * </ul>
+ * Initial offset, it should be one of the following
+ * <ul>
+ * <li>earliest</li>
+ * <li>latest</li>
+ * <li>application_or_earliest</li>
+ * <li>application_or_latest</li>
+ * </ul>
*/
public String getInitialOffset()
{
@@ -512,7 +515,6 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera
* <li>key.deserializer</li>
* <li>value.deserializer</li>
* </ul>
- *
*/
public Properties getConsumerProps()
{
@@ -534,7 +536,7 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera
/**
* @see <a href="http://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long)">
- * org.apache.kafka.clients.consumer.KafkaConsumer.poll</a>
+ * org.apache.kafka.clients.consumer.KafkaConsumer.poll</a>
*/
public long getConsumerTimeout()
{
@@ -610,8 +612,8 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera
}
/**
- * @omitFromUI
* @return current checkpointed offsets
+ * @omitFromUI
*/
public Map<AbstractKafkaPartitioner.PartitionMeta, Long> getOffsetTrack()
{
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/623b803f/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaOutputOperator.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaOutputOperator.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaOutputOperator.java
index f38ead9..0e16fe1 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaOutputOperator.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaOutputOperator.java
@@ -20,10 +20,12 @@
package org.apache.apex.malhar.kafka;
import java.util.Properties;
+
import javax.validation.constraints.NotNull;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
+
import com.datatorrent.api.Context;
import com.datatorrent.api.Operator;
@@ -99,7 +101,7 @@ public abstract class AbstractKafkaOutputOperator<K, V> implements Operator
*/
public void setProperty(Object key, Object val)
{
- properties.put(key,val);
+ properties.put(key, val);
}
public String getTopic()
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/623b803f/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
index c9b40be..791972f 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
@@ -47,8 +47,6 @@ import com.datatorrent.api.Partitioner;
import com.datatorrent.api.StatsListener;
import com.datatorrent.lib.util.KryoCloneUtils;
-import kafka.common.AuthorizationException;
-
/**
* Abstract partitioner used to manage the partitions of kafka input operator.
* It use a number of kafka consumers(one for each cluster) to get the latest partition metadata for topics that
@@ -74,8 +72,8 @@ public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKa
private ArrayList<KafkaConsumer<byte[], byte[]>> metadataRefreshClients;
-
- private final List<Set<AbstractKafkaPartitioner.PartitionMeta>> currentPartitions = new LinkedList<>(); // prevent null
+ // prevent null
+ private final List<Set<AbstractKafkaPartitioner.PartitionMeta>> currentPartitions = new LinkedList<>();
public AbstractKafkaPartitioner(String[] clusters, String[] topics, AbstractKafkaInputOperator prototypeOperator)
{
@@ -84,12 +82,11 @@ public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKa
this.prototypeOperator = prototypeOperator;
}
- abstract List<Set<PartitionMeta>> assign(Map<String, Map<String,List<PartitionInfo>>> metadata);
-
-
+ abstract List<Set<PartitionMeta>> assign(Map<String, Map<String, List<PartitionInfo>>> metadata);
@Override
- public Collection<Partition<AbstractKafkaInputOperator>> definePartitions(Collection<Partition<AbstractKafkaInputOperator>> collection, PartitioningContext partitioningContext)
+ public Collection<Partition<AbstractKafkaInputOperator>> definePartitions(
+ Collection<Partition<AbstractKafkaInputOperator>> collection, PartitioningContext partitioningContext)
{
initMetadataClients();
@@ -127,7 +124,8 @@ public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKa
} //end while
if (tryTime == 0) {
- throw new RuntimeException("Get partition info for topic completely failed. Please check the log file. topic name: " + topic);
+ throw new RuntimeException(
+ "Get partition info for topic completely failed. Please check the log file. topic name: " + topic);
}
}
}
@@ -143,7 +141,6 @@ public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKa
e.printStackTrace();
}
-
if (currentPartitions == parts || currentPartitions.equals(parts)) {
logger.debug("No partition change found");
return collection;
@@ -153,7 +150,7 @@ public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKa
currentPartitions.addAll(parts);
int i = 0;
List<Partition<AbstractKafkaInputOperator>> result = new LinkedList<>();
- for (Iterator<Partition<AbstractKafkaInputOperator>> iter = collection.iterator(); iter.hasNext();) {
+ for (Iterator<Partition<AbstractKafkaInputOperator>> iter = collection.iterator(); iter.hasNext(); ) {
Partition<AbstractKafkaInputOperator> nextPartition = iter.next();
if (parts.remove(nextPartition.getPartitionedInstance().assignment())) {
if (logger.isInfoEnabled()) {
@@ -186,7 +183,6 @@ public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKa
metadataRefreshClients = null;
}
-
@Override
public void partitioned(Map<Integer, Partition<AbstractKafkaInputOperator>> map)
{
@@ -201,12 +197,15 @@ public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKa
return response;
}
- protected Partitioner.Partition<AbstractKafkaInputOperator> createPartition(Set<AbstractKafkaPartitioner.PartitionMeta> partitionAssignment)
+ protected Partitioner.Partition<AbstractKafkaInputOperator> createPartition(
+ Set<AbstractKafkaPartitioner.PartitionMeta> partitionAssignment)
{
- Partitioner.Partition<AbstractKafkaInputOperator> p = new DefaultPartition<AbstractKafkaInputOperator>(KryoCloneUtils.cloneObject(prototypeOperator));
+ Partitioner.Partition<AbstractKafkaInputOperator> p =
+ new DefaultPartition<AbstractKafkaInputOperator>(KryoCloneUtils.cloneObject(prototypeOperator));
p.getPartitionedInstance().assign(partitionAssignment);
return p;
}
+
/**
*
*/
@@ -243,12 +242,13 @@ public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKa
* @param prop
* @return String
*/
- private String getPropertyAsString(Properties prop) {
+ private String getPropertyAsString(Properties prop)
+ {
StringWriter writer = new StringWriter();
try {
prop.store(writer, "");
} catch (IOException e) {
- logger.error("Cannot retrieve consumer properties for Logging : {}", e.getMessage() );
+ logger.error("Cannot retrieve consumer properties for Logging : {}", e.getMessage());
}
return writer.getBuffer().toString();
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/623b803f/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java
index fa4856e..2b3c762 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java
@@ -133,7 +133,8 @@ public class KafkaConsumerWrapper implements Closeable
try {
kc.position(tp);
} catch (NoOffsetForPartitionException e) {
- //the poll() method of a consumer will throw exception if any of subscribed consumers not initialized with position
+ //the poll() method of a consumer will throw exception
+ // if any of subscribed consumers not initialized with position
handleNoOffsetForPartitionException(e, kc);
}
kc.pause(tp);
@@ -145,7 +146,7 @@ public class KafkaConsumerWrapper implements Closeable
while (windowCount > 0) {
try {
ConsumerRecords<byte[], byte[]> records = kc.poll(ownerOperator.getConsumerTimeout());
- for (Iterator<ConsumerRecord<byte[], byte[]>> cri = records.iterator(); cri.hasNext() && windowCount > 0;) {
+ for (Iterator<ConsumerRecord<byte[], byte[]>> cri = records.iterator(); cri.hasNext() && windowCount > 0; ) {
ownerOperator.emitTuple(meta.getCluster(), cri.next());
windowCount--;
}
@@ -194,7 +195,6 @@ public class KafkaConsumerWrapper implements Closeable
{
try {
-
while (wrapper.isAlive.get()) {
if (wrapper.waitForReplay) {
Thread.sleep(100);
@@ -229,7 +229,8 @@ public class KafkaConsumerWrapper implements Closeable
}
}
- protected void handleNoOffsetForPartitionException(NoOffsetForPartitionException e, KafkaConsumer<byte[], byte[]> consumer)
+ protected void handleNoOffsetForPartitionException(NoOffsetForPartitionException e,
+ KafkaConsumer<byte[], byte[]> consumer)
{
// if initialOffset is set to EARLIST or LATEST
// and the application is run as first time
@@ -260,7 +261,6 @@ public class KafkaConsumerWrapper implements Closeable
}
}
-
/**
* This method is called in the activate method of the operator
*/
@@ -289,7 +289,6 @@ public class KafkaConsumerWrapper implements Closeable
Map<AbstractKafkaPartitioner.PartitionMeta, Long> currentOffset = ownerOperator.getOffsetTrack();
-
// create one thread for each cluster
// each thread use one KafkaConsumer to consume from 1+ partition(s) of 1+ topic(s)
for (Map.Entry<String, List<TopicPartition>> e : consumerAssignment.entrySet()) {
@@ -334,7 +333,6 @@ public class KafkaConsumerWrapper implements Closeable
kafkaConsumerExecutor.submit(new ConsumerThread(e.getKey(), kc, this));
}
-
}
/**
@@ -375,7 +373,6 @@ public class KafkaConsumerWrapper implements Closeable
holdingBuffer.put(msg);
}
-
@Override
public void close() throws IOException
{
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/623b803f/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaPartition.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaPartition.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaPartition.java
index 1646ffe..a07fe33 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaPartition.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaPartition.java
@@ -18,7 +18,6 @@
*/
package org.apache.apex.malhar.kafka;
-
import java.io.Serializable;
import org.apache.hadoop.classification.InterfaceStability;
@@ -54,7 +53,6 @@ public class KafkaPartition implements Serializable
*/
private static final long serialVersionUID = 7556802229202221546L;
-
private String clusterId;
private int partitionId;
@@ -141,6 +139,4 @@ public class KafkaPartition implements Serializable
return "KafkaPartition [clusterId=" + clusterId + ", partitionId=" + partitionId + ", topic=" + topic + "]";
}
-
-
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/623b803f/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
index ff16610..a8e333f 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
@@ -52,47 +52,47 @@ import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZE
/**
* Kafka output operator with exactly once processing semantics.
- *<br>
+ * <br>
*
- * <p>
+ * <p>
* <b>Requirements</b>
* <li>In the Kafka message, only Value will be available for users</li>
* <li>Users need to provide Value deserializers for Kafka message as it is used during recovery</li>
- * <li>Value type should have well defined Equals & HashCodes, as during messages are stored in HashMaps for comparison.</li>
+ * <li>Value type should have well defined Equals & HashCodes,
+ * as during messages are stored in HashMaps for comparison.</li>
* <p>
* <b>Recovery handling</b>
* <li> Offsets of the Kafka partitions are stored in the WindowDataManager at the endWindow</li>
* <li> During recovery,
* <ul>
- * <li>Partially written Streaming Window before the crash is constructed. ( Explained below ) </li>
- * <li>Tuples from the completed Streaming Window's are skipped </li>
- * <li>Tuples coming for the partially written Streaming Window are skipped.
- * (No assumption is made on the order and the uniqueness of the tuples) </li>
- * </ul>
- * </li>
- *</p>
+ * <li>Partially written Streaming Window before the crash is constructed. ( Explained below ) </li>
+ * <li>Tuples from the completed Streaming Window's are skipped </li>
+ * <li>Tuples coming for the partially written Streaming Window are skipped.
+ * (No assumption is made on the order and the uniqueness of the tuples) </li>
+ * </ul>
+ * </li>
+ * </p>
*
* <p>
* <b>Partial Window Construction</b>
* <li> Operator uses the Key in the Kafka message, which is not available for use by the operator users.</li>
* <li> Key is used to uniquely identify the message written by the particular instance of this operator.</li>
- * This allows multiple writers to same Kafka partitions. Format of the key is "APPLICATTION_ID#OPERATOR_ID".
+ * This allows multiple writers to same Kafka partitions. Format of the key is "APPLICATTION_ID#OPERATOR_ID".
* <li>During recovery Kafka partitions are read between the latest offset and the last written offsets.</li>
- *<li>All the tuples written by the particular instance is kept in the Map</li>
- *</p>
+ * <li>All the tuples written by the particular instance is kept in the Map</li>
+ * </p>
*
* <p>
* <b>Limitations</b>
* <li> Key in the Kafka message is reserved for Operator's use </li>
- * <li> During recovery, operator needs to read tuples between 2 offsets, if there are lot of data to be read, Operator may
- * appear to be blocked to the Stram and can kill the operator. </li>
- *</p>
+ * <li> During recovery, operator needs to read tuples between 2 offsets,
+ * if there are lot of data to be read, Operator may
+ * appear to be blocked to the Stram and can kill the operator. </li>
+ * </p>
*
* @displayName Kafka Single Port Exactly Once Output(0.9.0)
* @category Messaging
* @tags output operator
- *
- *
* @since 3.5.0
*/
@org.apache.hadoop.classification.InterfaceStability.Evolving
@@ -128,7 +128,8 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOu
setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER);
if (getProperties().getProperty(VALUE_DESERIALIZER_CLASS_CONFIG) == null) {
- throw new IllegalArgumentException("Value deserializer needs to be set for the operator, as it is used during recovery.");
+ throw new IllegalArgumentException(
+ "Value deserializer needs to be set for the operator, as it is used during recovery.");
}
super.setup(context);
@@ -242,19 +243,19 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOu
return false;
}
- private Map<Integer,Long> getPartitionsAndOffsets(boolean latest) throws ExecutionException, InterruptedException
+ private Map<Integer, Long> getPartitionsAndOffsets(boolean latest) throws ExecutionException, InterruptedException
{
List<PartitionInfo> partitionInfoList = consumer.partitionsFor(getTopic());
List<TopicPartition> topicPartitionList = new java.util.ArrayList<>();
- for ( PartitionInfo partitionInfo: partitionInfoList) {
- topicPartitionList.add(new TopicPartition(getTopic(), partitionInfo.partition()) );
+ for (PartitionInfo partitionInfo : partitionInfoList) {
+ topicPartitionList.add(new TopicPartition(getTopic(), partitionInfo.partition()));
}
- Map<Integer,Long> parttionsAndOffset = new HashMap<>();
+ Map<Integer, Long> parttionsAndOffset = new HashMap<>();
consumer.assign(topicPartitionList);
- for (PartitionInfo partitionInfo: partitionInfoList) {
+ for (PartitionInfo partitionInfo : partitionInfoList) {
try {
TopicPartition topicPartition = new TopicPartition(getTopic(), partitionInfo.partition());
if (latest) {
@@ -275,11 +276,11 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOu
{
logger.info("Rebuild the partial window after " + windowDataManager.getLargestCompletedWindow());
- Map<Integer,Long> storedOffsets;
- Map<Integer,Long> currentOffsets;
+ Map<Integer, Long> storedOffsets;
+ Map<Integer, Long> currentOffsets;
try {
- storedOffsets = (Map<Integer,Long>)this.windowDataManager.retrieve(windowId);
+ storedOffsets = (Map<Integer, Long>)this.windowDataManager.retrieve(windowId);
currentOffsets = getPartitionsAndOffsets(true);
} catch (IOException | ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
@@ -303,13 +304,13 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOu
List<TopicPartition> topicPartitions = new ArrayList<>();
- for (Map.Entry<Integer,Long> entry: currentOffsets.entrySet()) {
+ for (Map.Entry<Integer, Long> entry : currentOffsets.entrySet()) {
topicPartitions.add(new TopicPartition(getTopic(), entry.getKey()));
}
consumer.assign(topicPartitions);
- for (Map.Entry<Integer,Long> entry: currentOffsets.entrySet()) {
+ for (Map.Entry<Integer, Long> entry : currentOffsets.entrySet()) {
Long storedOffset = 0L;
Integer currentPartition = entry.getKey();
Long currentOffset = entry.getValue();
@@ -390,7 +391,7 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOu
return;
}
- getProducer().send(new ProducerRecord<>(getTopic(), key, tuple),new Callback()
+ getProducer().send(new ProducerRecord<>(getTopic(), key, tuple), new Callback()
{
public void onCompletion(RecordMetadata metadata, Exception e)
{
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/623b803f/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortOutputOperator.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortOutputOperator.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortOutputOperator.java
index 500602c..c47cf3d 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortOutputOperator.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortOutputOperator.java
@@ -20,6 +20,7 @@
package org.apache.apex.malhar.kafka;
import org.apache.kafka.clients.producer.ProducerRecord;
+
import com.datatorrent.api.DefaultInputPort;
/**
@@ -29,17 +30,17 @@ import com.datatorrent.api.DefaultInputPort;
* @since 3.5.0
*/
@org.apache.hadoop.classification.InterfaceStability.Evolving
-public class KafkaSinglePortOutputOperator<K,V> extends AbstractKafkaOutputOperator
+public class KafkaSinglePortOutputOperator<K, V> extends AbstractKafkaOutputOperator
{
- /**
- * This input port receives tuples that will be written out to Kafka.
- */
+ /**
+ * This input port receives tuples that will be written out to Kafka.
+ */
public final transient DefaultInputPort<V> inputPort = new DefaultInputPort<V>()
{
@Override
public void process(V tuple)
{
- getProducer().send(new ProducerRecord<K,V>(getTopic(),tuple));
+ getProducer().send(new ProducerRecord<K, V>(getTopic(), tuple));
}
};
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/623b803f/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java
index 3b4d3f3..eb0cc40 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java
@@ -50,7 +50,8 @@ public class OneToManyPartitioner extends AbstractKafkaPartitioner
}
int partitionCount = prototypeOperator.getInitialPartitionCount();
- ArrayList<Set<PartitionMeta>> eachPartitionAssignment = new ArrayList<>(prototypeOperator.getInitialPartitionCount());
+ ArrayList<Set<PartitionMeta>> eachPartitionAssignment =
+ new ArrayList<>(prototypeOperator.getInitialPartitionCount());
int i = 0;
for (Map.Entry<String, Map<String, List<PartitionInfo>>> clusterMap : metadata.entrySet()) {
for (Map.Entry<String, List<PartitionInfo>> topicPartition : clusterMap.getValue().entrySet()) {
@@ -59,7 +60,8 @@ public class OneToManyPartitioner extends AbstractKafkaPartitioner
if (index >= eachPartitionAssignment.size()) {
eachPartitionAssignment.add(new HashSet<PartitionMeta>());
}
- eachPartitionAssignment.get(index).add(new PartitionMeta(clusterMap.getKey(), topicPartition.getKey(), pif.partition()));
+ eachPartitionAssignment.get(index).add(new PartitionMeta(clusterMap.getKey(),
+ topicPartition.getKey(), pif.partition()));
}
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/623b803f/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToOnePartitioner.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToOnePartitioner.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToOnePartitioner.java
index 570bdea..05faab6 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToOnePartitioner.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToOnePartitioner.java
@@ -50,7 +50,8 @@ public class OneToOnePartitioner extends AbstractKafkaPartitioner
for (Map.Entry<String, Map<String, List<PartitionInfo>>> clusterMap : metadata.entrySet()) {
for (Map.Entry<String, List<PartitionInfo>> topicPartition : clusterMap.getValue().entrySet()) {
for (PartitionInfo pif : topicPartition.getValue()) {
- currentAssignment.add(Sets.newHashSet(new PartitionMeta(clusterMap.getKey(), topicPartition.getKey(), pif.partition())));
+ currentAssignment.add(Sets.newHashSet(new PartitionMeta(clusterMap.getKey(),
+ topicPartition.getKey(), pif.partition())));
}
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/623b803f/kafka/src/main/java/org/apache/apex/malhar/kafka/PartitionStrategy.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/PartitionStrategy.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/PartitionStrategy.java
index 7c142c5..feafa3b 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/PartitionStrategy.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/PartitionStrategy.java
@@ -31,7 +31,8 @@ public enum PartitionStrategy
*/
ONE_TO_ONE,
/**
- * Each operator consumes from several kafka partitions with overall input rate under some certain hard limit in msgs/s or bytes/s
+ * Each operator consumes from several kafka partitions with overall input rate under
+ * some certain hard limit in msgs/s or bytes/s
* For now it <b>only</b> support <b>simple kafka consumer</b>
*/
ONE_TO_MANY,
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/623b803f/kafka/src/test/java/org/apache/apex/malhar/kafka/EmbeddedKafka.java
----------------------------------------------------------------------
diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/EmbeddedKafka.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/EmbeddedKafka.java
index 5ddcb18..e9fcc36 100644
--- a/kafka/src/test/java/org/apache/apex/malhar/kafka/EmbeddedKafka.java
+++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/EmbeddedKafka.java
@@ -118,7 +118,7 @@ public class EmbeddedKafka
{
Properties producerProps = new Properties();
producerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT);
- producerProps.setProperty("key.serializer","org.apache.kafka.common.serialization.IntegerSerializer");
+ producerProps.setProperty("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
producerProps.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
try (KafkaProducer<Integer, byte[]> producer = new KafkaProducer<>(producerProps)) {
@@ -144,9 +144,10 @@ public class EmbeddedKafka
consumerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT);
consumerProps.setProperty("group.id", "group0");
consumerProps.setProperty("client.id", "consumer0");
- consumerProps.setProperty("key.deserializer","org.apache.kafka.common.serialization.IntegerDeserializer");
+ consumerProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
consumerProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
- consumerProps.put("auto.offset.reset", earliest ? "earliest" : "latest"); // to make sure the consumer starts from the beginning of the topic
+ // to make sure the consumer starts from the beginning of the topic
+ consumerProps.put("auto.offset.reset", earliest ? "earliest" : "latest");
KafkaConsumer<Integer, byte[]> consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(Arrays.asList(topic));
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/623b803f/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaConsumerPropertiesTest.java
----------------------------------------------------------------------
diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaConsumerPropertiesTest.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaConsumerPropertiesTest.java
index 83e0de6..81d7ade 100644
--- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaConsumerPropertiesTest.java
+++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaConsumerPropertiesTest.java
@@ -41,6 +41,7 @@ public class KafkaConsumerPropertiesTest
public class Watcher extends TestWatcher
{
Context.OperatorContext context;
+
@Override
protected void starting(Description description)
{
@@ -50,8 +51,8 @@ public class KafkaConsumerPropertiesTest
kafkaInput.setTopics("apexTest");
kafkaInput.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name());
Properties prop = new Properties();
- prop.setProperty("security.protocol","SASL_PLAINTEXT");
- prop.setProperty("sasl.kerberos.service.name","kafka");
+ prop.setProperty("security.protocol", "SASL_PLAINTEXT");
+ prop.setProperty("sasl.kerberos.service.name", "kafka");
kafkaInput.setConsumerProps(prop);
}
@@ -71,8 +72,9 @@ public class KafkaConsumerPropertiesTest
kafkaInput.definePartitions(null, null);
} catch (KafkaException e) {
//Ensures the properties of the consumer are set/not reset.
- Assert.assertEquals("java.lang.IllegalArgumentException: You must pass java.security.auth.login.config in " +
- "secure mode.", e.getCause().getMessage());
+ Assert.assertEquals(
+ "java.lang.IllegalArgumentException: You must pass java.security.auth.login.config in "
+ + "secure mode.", e.getCause().getMessage());
}
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/623b803f/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaHelper.java
----------------------------------------------------------------------
diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaHelper.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaHelper.java
index c550032..abf3a5b 100644
--- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaHelper.java
+++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaHelper.java
@@ -25,7 +25,8 @@ import java.util.Map;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
-public class KafkaHelper implements Serializer<KafkaOutputOperatorTest.Person>, Deserializer<KafkaOutputOperatorTest.Person>
+public class KafkaHelper implements Serializer<KafkaOutputOperatorTest.Person>,
+ Deserializer<KafkaOutputOperatorTest.Person>
{
@Override
public KafkaOutputOperatorTest.Person deserialize(String s, byte[] bytes)
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/623b803f/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
index 4e97d72..f16c8f4 100644
--- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
+++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
@@ -21,14 +21,10 @@ package org.apache.apex.malhar.kafka;
import java.io.File;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -54,7 +50,8 @@ import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.stram.StramLocalCluster;
/**
- * A bunch of test to verify the input operator will be automatically partitioned per kafka partition This test is launching its
+ * A bunch of test to verify the input operator will be automatically partitioned
+ * per kafka partition This test is launching its
* own Kafka cluster.
*/
@RunWith(Parameterized.class)
@@ -90,7 +87,6 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
@Rule
public final KafkaTestInfo testInfo = new KafkaTestInfo();
-
@Parameterized.Parameters(name = "multi-cluster: {0}, multi-partition: {1}, partition: {2}")
public static Collection<Object[]> testScenario()
{
@@ -106,8 +102,6 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
});
}
-
-
@Before
public void before()
{
@@ -196,7 +190,6 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
endTuples = 0;
}
-
public void processTuple(byte[] bt)
{
String tuple = new String(bt);
@@ -220,7 +213,8 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
String key = operatorId + "," + currentWindowId;
List<String> msgsInWin = tupleCollectedInWindow.get(key);
if (msgsInWin != null) {
- Assert.assertEquals("replay messages should be exactly same as previous window", msgsInWin, windowTupleCollector);
+ Assert.assertEquals(
+ "replay messages should be exactly same as previous window", msgsInWin, windowTupleCollector);
} else {
List<String> newList = Lists.newArrayList();
newList.addAll(windowTupleCollector);
@@ -235,10 +229,11 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
int countDownTupleSize = countDownAll ? tupleSize : endTuples;
if (latch != null) {
- Assert.assertTrue("received END_TUPLES more than expected.", latch.getCount() >= countDownTupleSize);
+ Assert.assertTrue(
+ "received END_TUPLES more than expected.", latch.getCount() >= countDownTupleSize);
while (countDownTupleSize > 0) {
- latch.countDown();
- --countDownTupleSize;
+ latch.countDown();
+ --countDownTupleSize;
}
if (latch.getCount() == 0) {
/**
@@ -265,7 +260,6 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
}
-
/**
* Test AbstractKafkaSinglePortInputOperator (i.e. an input adapter for Kafka, aka consumer). This module receives
* data from an outside test generator through Kafka message bus and feed that data into Malhar streaming platform.
@@ -283,7 +277,6 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
testInputOperator(false, false);
}
-
@Test
public void testInputOperatorWithFailure() throws Exception
{
@@ -303,7 +296,9 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
// each broker should get a END_TUPLE message
latch = new CountDownLatch(countDownAll ? totalCount + totalBrokers : totalBrokers);
- logger.info("Test Case: name: {}; totalBrokers: {}; hasFailure: {}; hasMultiCluster: {}; hasMultiPartition: {}, partition: {}",
+ logger.info(
+ "Test Case: name: {}; totalBrokers: {}; hasFailure: {}; hasMultiCluster: {};" +
+ " hasMultiPartition: {}, partition: {}",
testName, totalBrokers, hasFailure, hasMultiCluster, hasMultiPartition, partition);
// Start producer
@@ -319,7 +314,8 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
DAG dag = lma.getDAG();
// Create KafkaSinglePortStringInputOperator
- KafkaSinglePortInputOperator node = dag.addOperator("Kafka input" + testName, KafkaSinglePortInputOperator.class);
+ KafkaSinglePortInputOperator node = dag.addOperator(
+ "Kafka input" + testName, KafkaSinglePortInputOperator.class);
node.setInitialPartitionCount(1);
// set topic
node.setTopics(testName);
@@ -330,13 +326,13 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
node.setWindowDataManager(new FSWindowDataManager());
}
-
// Create Test tuple collector
CollectorModule collector = dag.addOperator("TestMessageCollector", CollectorModule.class);
collector.isIdempotentTest = idempotent;
// Connect ports
- dag.addStream("Kafka message" + testName, node.outputPort, collector.inputPort).setLocality(Locality.CONTAINER_LOCAL);
+ dag.addStream("Kafka message" + testName, node.outputPort, collector.inputPort)
+ .setLocality(Locality.CONTAINER_LOCAL);
if (hasFailure) {
setupHasFailureTest(node, dag);
@@ -347,7 +343,8 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
lc.setHeartbeatMonitoringEnabled(false);
//let the Controller to run the inside another thread. It is almost same as call Controller.runAsync(),
- //but Controller.runAsync() don't expose the thread which run it, so we don't know when the thread will be terminated.
+ //but Controller.runAsync() don't expose the thread which run it,
+ //so we don't know when the thread will be terminated.
//create this thread and then call join() to make sure the Controller shutdown completely.
monitorThread = new Thread((StramLocalCluster)lc, "master");
monitorThread.start();
@@ -370,21 +367,23 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
logger.info("Number of received/expected tuples: {}/{}, testName: {}, tuples: \n{}", tupleCollection.size(),
expectedReceiveCount, testName, tupleCollection);
}
- Assert.assertTrue("TIMEOUT. testName: " + this.testName + "; Collected data: " + tupleCollection, notTimeout);
+ Assert.assertTrue("TIMEOUT. testName: " + this.testName + "; Collected data: "
+ + tupleCollection, notTimeout);
// Check results
- Assert.assertTrue( "testName: " + testName + "; Collected tuple size: " + tupleCollection.size() + "; Expected tuple size: " + expectedReceiveCount + "; data: \n" + tupleCollection,
+ Assert.assertTrue("testName: " + testName + "; Collected tuple size: " + tupleCollection.size()
+ + "; Expected tuple size: " + expectedReceiveCount + "; data: \n" + tupleCollection,
expectedReceiveCount == tupleCollection.size());
logger.info("End of test case: {}", testName);
}
-
private void setupHasFailureTest(KafkaSinglePortInputOperator operator, DAG dag)
{
operator.setHoldingBufferSize(5000);
dag.setAttribute(Context.DAGContext.CHECKPOINT_WINDOW_COUNT, 1);
- //dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new FSStorageAgent(APPLICATION_PATH + "failureck", new Configuration()));
+ //dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new FSStorageAgent(
+ // APPLICATION_PATH + "failureck", new Configuration()));
operator.setMaxTuplesPerWindow(tuplesPerWindow);
}
@@ -394,8 +393,7 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
return l + TEST_KAFKA_BROKER_PORT[0][0] +
(hasMultiPartition ? "," + l + TEST_KAFKA_BROKER_PORT[0][1] : "") +
(hasMultiCluster ? ";" + l + TEST_KAFKA_BROKER_PORT[1][0] : "") +
- (hasMultiCluster && hasMultiPartition ? "," + l + TEST_KAFKA_BROKER_PORT[1][1] : "");
+ (hasMultiCluster && hasMultiPartition ? "," + l + TEST_KAFKA_BROKER_PORT[1][1] : "");
}
-
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/623b803f/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOperatorTestBase.java
----------------------------------------------------------------------
diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOperatorTestBase.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOperatorTestBase.java
index a05fd9b..3910546 100644
--- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOperatorTestBase.java
+++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOperatorTestBase.java
@@ -72,9 +72,9 @@ public class KafkaOperatorTestBase
}
TEST_ZOOKEEPER_PORT = new int[]{p[0], p[1]};
- TEST_KAFKA_BROKER_PORT = new int[][] {
- new int[] {p[2], p[3]},
- new int[] {p[4], p[5]}
+ TEST_KAFKA_BROKER_PORT = new int[][]{
+ new int[]{p[2], p[3]},
+ new int[]{p[4], p[5]}
};
}
@@ -93,8 +93,10 @@ public class KafkaOperatorTestBase
private static final String zkBaseDir = "zookeeper-server-data";
private static final String kafkaBaseDir = "kafka-server-data";
- private static final String[] zkdir = new String[] { "zookeeper-server-data/1", "zookeeper-server-data/2" };
- private static final String[][] kafkadir = new String[][] { new String[] { "kafka-server-data/1/1", "kafka-server-data/1/2" }, new String[] { "kafka-server-data/2/1", "kafka-server-data/2/2" } };
+ private static final String[] zkdir = new String[]{"zookeeper-server-data/1", "zookeeper-server-data/2"};
+ private static final String[][] kafkadir = new String[][]{
+ new String[]{"kafka-server-data/1/1", "kafka-server-data/1/2"},
+ new String[]{"kafka-server-data/2/1", "kafka-server-data/2/2"}};
protected boolean hasMultiPartition = false;
protected boolean hasMultiCluster = false;
@@ -132,8 +134,8 @@ public class KafkaOperatorTestBase
zkf.shutdown();
}
}
- zkServer = new ZooKeeperServer[2];
- zkFactory = new ServerCnxnFactory[2];
+ zkServer = new ZooKeeperServer[2];
+ zkFactory = new ServerCnxnFactory[2];
}
public static void startKafkaServer(int clusterid, int brokerid)
@@ -156,7 +158,8 @@ public class KafkaOperatorTestBase
{
FileUtils.deleteQuietly(new File(baseDir, kafkaBaseDir));
- //boolean[][] startable = new boolean[][] { new boolean[] { true, hasMultiPartition }, new boolean[] { hasMultiCluster, hasMultiCluster && hasMultiPartition } };
+ //boolean[][] startable = new boolean[][] { new boolean[] { true, hasMultiPartition },
+ // new boolean[] { hasMultiCluster, hasMultiCluster && hasMultiPartition } };
startKafkaServer(0, 0);
startKafkaServer(0, 1);
startKafkaServer(1, 0);
@@ -261,13 +264,15 @@ public class KafkaOperatorTestBase
// TODO Auto-generated constructor stub
}
- public TestZookeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, DataTreeBuilder treeBuilder) throws IOException
+ public TestZookeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, DataTreeBuilder treeBuilder)
+ throws IOException
{
super(txnLogFactory, tickTime, treeBuilder);
// TODO Auto-generated constructor stub
}
- public TestZookeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, DataTreeBuilder treeBuilder, ZKDatabase zkDb)
+ public TestZookeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout,
+ int maxSessionTimeout, DataTreeBuilder treeBuilder, ZKDatabase zkDb)
{
super(txnLogFactory, tickTime, minSessionTimeout, maxSessionTimeout, treeBuilder, zkDb);
// TODO Auto-generated constructor stub
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/623b803f/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java
----------------------------------------------------------------------
diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java
index 58d69f6..7abf0f8 100644
--- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java
+++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java
@@ -31,7 +31,6 @@ import org.junit.Before;
import org.junit.Test;
import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
-
import org.apache.commons.io.FileUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
@@ -145,7 +144,8 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase
Assert.assertTrue("No failure", fromKafka.size() > toKafka.size());
}
- private void sendDataToKafka(boolean exactlyOnce, List<Person> toKafka, boolean hasFailure, boolean differentTuplesAfterRecovery) throws InterruptedException
+ private void sendDataToKafka(boolean exactlyOnce, List<Person> toKafka, boolean hasFailure,
+ boolean differentTuplesAfterRecovery) throws InterruptedException
{
Properties props = new Properties();
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VALUE_SERIALIZER);
@@ -159,7 +159,8 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase
attributeMap.put(Context.DAGContext.APPLICATION_NAME, "MyKafkaApp");
attributeMap.put(DAG.APPLICATION_PATH, APPLICATION_PATH);
- OperatorContextTestHelper.TestIdOperatorContext operatorContext = new OperatorContextTestHelper.TestIdOperatorContext(2, attributeMap);
+ OperatorContextTestHelper.TestIdOperatorContext operatorContext =
+ new OperatorContextTestHelper.TestIdOperatorContext(2, attributeMap);
cleanUp(operatorContext);
@@ -167,11 +168,13 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase
DefaultInputPort<Person> inputPort;
if (exactlyOnce) {
- KafkaSinglePortExactlyOnceOutputOperator<Person> kafkaOutputTemp = ResetKafkaOutput(testName, props, operatorContext);
+ KafkaSinglePortExactlyOnceOutputOperator<Person> kafkaOutputTemp =
+ ResetKafkaOutput(testName, props, operatorContext);
inputPort = kafkaOutputTemp.inputPort;
kafkaOutput = kafkaOutputTemp;
} else {
- KafkaSinglePortOutputOperator<String, Person> kafkaOutputTemp = ResetKafkaSimpleOutput(testName, props, operatorContext);
+ KafkaSinglePortOutputOperator<String, Person> kafkaOutputTemp =
+ ResetKafkaSimpleOutput(testName, props, operatorContext);
inputPort = kafkaOutputTemp.inputPort;
kafkaOutput = kafkaOutputTemp;
}
@@ -192,11 +195,13 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase
if (hasFailure) {
if (exactlyOnce) {
- KafkaSinglePortExactlyOnceOutputOperator<Person> kafkaOutputTemp = ResetKafkaOutput(testName, props, operatorContext);
+ KafkaSinglePortExactlyOnceOutputOperator<Person> kafkaOutputTemp =
+ ResetKafkaOutput(testName, props, operatorContext);
inputPort = kafkaOutputTemp.inputPort;
kafkaOutput = kafkaOutputTemp;
} else {
- KafkaSinglePortOutputOperator<String,Person> kafkaOutputTemp = ResetKafkaSimpleOutput(testName, props, operatorContext);
+ KafkaSinglePortOutputOperator<String, Person> kafkaOutputTemp =
+ ResetKafkaSimpleOutput(testName, props, operatorContext);
inputPort = kafkaOutputTemp.inputPort;
kafkaOutput = kafkaOutputTemp;
}
@@ -225,7 +230,8 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase
cleanUp(operatorContext);
}
- private KafkaSinglePortExactlyOnceOutputOperator<Person> ResetKafkaOutput(String testName, Properties props, Context.OperatorContext operatorContext)
+ private KafkaSinglePortExactlyOnceOutputOperator<Person> ResetKafkaOutput(
+ String testName, Properties props, Context.OperatorContext operatorContext)
{
KafkaSinglePortExactlyOnceOutputOperator<Person> kafkaOutput = new KafkaSinglePortExactlyOnceOutputOperator<>();
kafkaOutput.setTopic(testName);
@@ -235,9 +241,10 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase
return kafkaOutput;
}
- private KafkaSinglePortOutputOperator<String, Person> ResetKafkaSimpleOutput(String testName, Properties props, Context.OperatorContext operatorContext)
+ private KafkaSinglePortOutputOperator<String, Person> ResetKafkaSimpleOutput(
+ String testName, Properties props, Context.OperatorContext operatorContext)
{
- KafkaSinglePortOutputOperator<String,Person> kafkaOutput = new KafkaSinglePortOutputOperator<>();
+ KafkaSinglePortOutputOperator<String, Person> kafkaOutput = new KafkaSinglePortOutputOperator<>();
kafkaOutput.setTopic(testName);
kafkaOutput.setProperties(props);
kafkaOutput.setup(operatorContext);
@@ -263,7 +270,7 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase
}
for (int i = 0; i < fromKafka.size(); ++i) {
- if ( !fromKafka.get(i).equals(toKafka.get(i))) {
+ if (!fromKafka.get(i).equals(toKafka.get(i))) {
return false;
}
}
@@ -275,9 +282,9 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase
{
String l = "localhost:";
return l + TEST_KAFKA_BROKER_PORT[0][0] +
- (hasMultiPartition ? "," + l + TEST_KAFKA_BROKER_PORT[0][1] : "") +
- (hasMultiCluster ? ";" + l + TEST_KAFKA_BROKER_PORT[1][0] : "") +
- (hasMultiCluster && hasMultiPartition ? "," + l + TEST_KAFKA_BROKER_PORT[1][1] : "");
+ (hasMultiPartition ? "," + l + TEST_KAFKA_BROKER_PORT[0][1] : "") +
+ (hasMultiCluster ? ";" + l + TEST_KAFKA_BROKER_PORT[1][0] : "") +
+ (hasMultiCluster && hasMultiPartition ? "," + l + TEST_KAFKA_BROKER_PORT[1][1] : "");
}
private List<Person> GenerateList()
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/623b803f/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestPartitioner.java
----------------------------------------------------------------------
diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestPartitioner.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestPartitioner.java
index 21f8977..6098bde 100644
--- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestPartitioner.java
+++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestPartitioner.java
@@ -33,11 +33,13 @@ import kafka.utils.VerifiableProperties;
*/
public class KafkaTestPartitioner implements Partitioner
{
- public KafkaTestPartitioner(VerifiableProperties props) {
+ public KafkaTestPartitioner(VerifiableProperties props)
+ {
}
- public KafkaTestPartitioner() {
+ public KafkaTestPartitioner()
+ {
}
@@ -45,7 +47,7 @@ public class KafkaTestPartitioner implements Partitioner
public int partition(String topic, Object key, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster)
{
int num_partitions = cluster.partitionsForTopic(topic).size();
- return Integer.parseInt((String)key)%num_partitions;
+ return Integer.parseInt((String)key) % num_partitions;
}
@Override
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/623b803f/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java
----------------------------------------------------------------------
diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java
index ca6cc98..0f18666 100644
--- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java
+++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java
@@ -62,7 +62,8 @@ public class KafkaTestProducer implements Runnable
this.sendCount = sendCount;
}
- public void setMessages(List<String> messages) {
+ public void setMessages(List<String> messages)
+ {
this.messages = messages;
}
@@ -72,8 +73,8 @@ public class KafkaTestProducer implements Runnable
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, KafkaTestPartitioner.class.getName());
- String brokerList = "localhost:"+KafkaOperatorTestBase.TEST_KAFKA_BROKER_PORT[cid][0];
- brokerList += hasPartition ? (",localhost:" + KafkaOperatorTestBase.TEST_KAFKA_BROKER_PORT[cid][1]):"";
+ String brokerList = "localhost:" + KafkaOperatorTestBase.TEST_KAFKA_BROKER_PORT[cid][0];
+ brokerList += hasPartition ? (",localhost:" + KafkaOperatorTestBase.TEST_KAFKA_BROKER_PORT[cid][1]) : "";
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
props.setProperty(ProducerConfig.METADATA_MAX_AGE_CONFIG, "20000");
props.setProperty(ProducerConfig.ACKS_CONFIG, getAckType());
@@ -94,14 +95,15 @@ public class KafkaTestProducer implements Runnable
this.hasPartition = hasPartition;
this.hasMultiCluster = hasMultiCluster;
producer = new KafkaProducer<>(createProducerConfig(0));
- if(hasMultiCluster){
+ if (hasMultiCluster) {
producer1 = new KafkaProducer<>(createProducerConfig(1));
} else {
producer1 = null;
}
}
- public KafkaTestProducer(String topic, boolean hasPartition) {
+ public KafkaTestProducer(String topic, boolean hasPartition)
+ {
this(topic, hasPartition, false);
}
@@ -115,7 +117,7 @@ public class KafkaTestProducer implements Runnable
String messageStr = "_" + messageNo++;
int k = rand.nextInt(100);
sendTasks.add(producer.send(new ProducerRecord<>(topic, "" + k, "c1" + messageStr)));
- if(hasMultiCluster && messageNo <= sendCount){
+ if (hasMultiCluster && messageNo <= sendCount) {
messageStr = "_" + messageNo++;
sendTasks.add(producer1.send(new ProducerRecord<>(topic, "" + k, "c2" + messageStr)));
}
@@ -142,7 +144,7 @@ public class KafkaTestProducer implements Runnable
}
producer.flush();
- if (producer1!=null) {
+ if (producer1 != null) {
producer1.flush();
}
@@ -160,7 +162,7 @@ public class KafkaTestProducer implements Runnable
public void close()
{
producer.close();
- if (producer1!=null) {
+ if (producer1 != null) {
producer1.close();
}
}
@@ -174,4 +176,4 @@ public class KafkaTestProducer implements Runnable
{
this.ackType = ackType;
}
-} // End of KafkaTestProducer
\ No newline at end of file
+} // End of KafkaTestProducer