You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2015/04/02 10:13:53 UTC
[1/2] flink git commit: [FLINK-1589] Add option to pass configuration
to LocalExecutor
Repository: flink
Updated Branches:
refs/heads/master 79a92a647 -> 359b39c38
[FLINK-1589] Add option to pass configuration to LocalExecutor
This closes #427
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7cf95862
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7cf95862
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7cf95862
Branch: refs/heads/master
Commit: 7cf9586223e3344c574ef58b99fdace8a34de6c4
Parents: 79a92a6
Author: Robert Metzger <rm...@apache.org>
Authored: Fri Feb 20 12:40:41 2015 +0100
Committer: Robert Metzger <rm...@apache.org>
Committed: Thu Apr 2 09:00:28 2015 +0200
----------------------------------------------------------------------
docs/local_execution.md | 8 ++
.../org/apache/flink/client/LocalExecutor.java | 18 ++-
.../apache/flink/api/common/PlanExecutor.java | 6 +-
.../flink/api/java/ExecutionEnvironment.java | 15 +++
.../apache/flink/api/java/LocalEnvironment.java | 10 +-
.../clients/examples/LocalExecutorITCase.java | 1 -
.../ExecutionEnvironmentITCase.java | 110 +++++++++++++++++++
7 files changed, 160 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7cf95862/docs/local_execution.md
----------------------------------------------------------------------
diff --git a/docs/local_execution.md b/docs/local_execution.md
index bac1745..8e7ecc4 100644
--- a/docs/local_execution.md
+++ b/docs/local_execution.md
@@ -77,6 +77,14 @@ public static void main(String[] args) throws Exception {
The `JobExecutionResult` object, which is returned after the execution finished, contains the program runtime and the accumulator results.
+The `LocalEnvironment` allows also to pass custom configuration values to Flink.
+
+~~~java
+Configuration conf = new Configuration();
+conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, 0.5f);
+final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(conf);
+~~~
+
*Note:* The local execution environments do not start any web frontend to monitor the execution.
## Collection Environment
http://git-wip-us.apache.org/repos/asf/flink/blob/7cf95862/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
index 5ee4e5d..b1705df 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
@@ -53,6 +53,8 @@ public class LocalExecutor extends PlanExecutor {
private LocalFlinkMiniCluster flink;
+ private Configuration configuration;
+
// ---------------------------------- config options ------------------------------------------
@@ -70,6 +72,11 @@ public class LocalExecutor extends PlanExecutor {
}
}
+ public LocalExecutor(Configuration conf) {
+ this();
+ this.configuration = conf;
+ }
+
public boolean isDefaultOverwriteFiles() {
@@ -90,7 +97,7 @@ public class LocalExecutor extends PlanExecutor {
// --------------------------------------------------------------------------------------------
- public static Configuration getConfiguration(LocalExecutor le) {
+ public static Configuration createConfiguration(LocalExecutor le) {
Configuration configuration = new Configuration();
configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, le.getTaskManagerNumSlots());
configuration.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, le.isDefaultOverwriteFiles());
@@ -102,7 +109,10 @@ public class LocalExecutor extends PlanExecutor {
if (this.flink == null) {
// create the embedded runtime
- Configuration configuration = getConfiguration(this);
+ Configuration configuration = createConfiguration(this);
+ if(this.configuration != null) {
+ configuration.addAll(this.configuration);
+ }
// start it up
this.flink = new LocalFlinkMiniCluster(configuration, true);
} else {
@@ -135,6 +145,7 @@ public class LocalExecutor extends PlanExecutor {
* @throws Exception Thrown, if either the startup of the local execution context, or the execution
* caused an exception.
*/
+ @Override
public JobExecutionResult executePlan(Plan plan) throws Exception {
if (plan == null) {
throw new IllegalArgumentException("The plan may not be null.");
@@ -190,8 +201,9 @@ public class LocalExecutor extends PlanExecutor {
* @return JSON dump of the optimized plan.
* @throws Exception
*/
+ @Override
public String getOptimizerPlanAsJSON(Plan plan) throws Exception {
- Optimizer pc = new Optimizer(new DataStatistics(), getConfiguration(this));
+ Optimizer pc = new Optimizer(new DataStatistics(), createConfiguration(this));
OptimizedPlan op = pc.compile(plan);
PlanJSONDumpGenerator gen = new PlanJSONDumpGenerator();
http://git-wip-us.apache.org/repos/asf/flink/blob/7cf95862/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java b/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java
index 0d48add..9eaddd1 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java
@@ -19,6 +19,8 @@
package org.apache.flink.api.common;
+import org.apache.flink.configuration.Configuration;
+
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -60,11 +62,11 @@ public abstract class PlanExecutor {
*
* @return A local executor.
*/
- public static PlanExecutor createLocalExecutor() {
+ public static PlanExecutor createLocalExecutor(Configuration configuration) {
Class<? extends PlanExecutor> leClass = loadExecutorClass(LOCAL_EXECUTOR_CLASS);
try {
- return leClass.newInstance();
+ return leClass.getConstructor(Configuration.class).newInstance(configuration);
}
catch (Throwable t) {
throw new RuntimeException("An error occurred while loading the local executor (" + LOCAL_EXECUTOR_CLASS + ").", t);
http://git-wip-us.apache.org/repos/asf/flink/blob/7cf95862/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 9d1fc36..5e71f44 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -63,6 +63,7 @@ import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.ValueTypeInfo;
import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.types.StringValue;
import org.apache.flink.util.NumberSequenceIterator;
@@ -1058,6 +1059,20 @@ public abstract class ExecutionEnvironment {
lee.setParallelism(parallelism);
return lee;
}
+
+ /**
+ * Creates a {@link LocalEnvironment}. The local execution environment will run the program in a
+ * multi-threaded fashion in the same JVM as the environment was created in. It will use the
+ * parallelism specified in the parameter.
+ *
+ * @param customConfiguration Pass a custom configuration to the LocalEnvironment.
+ * @return A local execution environment with the specified parallelism.
+ */
+ public static LocalEnvironment createLocalEnvironment(Configuration customConfiguration) {
+ LocalEnvironment lee = new LocalEnvironment();
+ lee.setConfiguration(customConfiguration);
+ return lee;
+ }
/**
* Creates a {@link RemoteEnvironment}. The remote environment sends (parts of) the program
http://git-wip-us.apache.org/repos/asf/flink/blob/7cf95862/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
index beecaf6..33bebf6 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.PlanExecutor;
+import org.apache.flink.configuration.Configuration;
/**
* An {@link ExecutionEnvironment} that runs the program locally, multi-threaded, in the JVM where the
@@ -32,6 +33,7 @@ import org.apache.flink.api.common.PlanExecutor;
* machine.
*/
public class LocalEnvironment extends ExecutionEnvironment {
+ private Configuration configuration;
/**
* Creates a new local environment.
*/
@@ -47,7 +49,7 @@ public class LocalEnvironment extends ExecutionEnvironment {
public JobExecutionResult execute(String jobName) throws Exception {
Plan p = createProgramPlan(jobName);
- PlanExecutor executor = PlanExecutor.createLocalExecutor();
+ PlanExecutor executor = PlanExecutor.createLocalExecutor(configuration);
return executor.executePlan(p);
}
@@ -55,7 +57,7 @@ public class LocalEnvironment extends ExecutionEnvironment {
public String getExecutionPlan() throws Exception {
Plan p = createProgramPlan(null, false);
- PlanExecutor executor = PlanExecutor.createLocalExecutor();
+ PlanExecutor executor = PlanExecutor.createLocalExecutor(configuration);
return executor.getOptimizerPlanAsJSON(p);
}
// --------------------------------------------------------------------------------------------
@@ -65,4 +67,8 @@ public class LocalEnvironment extends ExecutionEnvironment {
return "Local Environment (parallelism = " + (getParallelism() == -1 ? "default" : getParallelism())
+ ") : " + getIdString();
}
+
+ public void setConfiguration(Configuration customConfiguration) {
+ this.configuration = customConfiguration;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7cf95862/flink-tests/src/test/java/org/apache/flink/test/clients/examples/LocalExecutorITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/clients/examples/LocalExecutorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/clients/examples/LocalExecutorITCase.java
index 62e2893..4f74740 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/clients/examples/LocalExecutorITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/clients/examples/LocalExecutorITCase.java
@@ -61,6 +61,5 @@ public class LocalExecutorITCase {
e.printStackTrace();
Assert.fail(e.getMessage());
}
-
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7cf95862/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ExecutionEnvironmentITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ExecutionEnvironmentITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ExecutionEnvironmentITCase.java
new file mode 100644
index 0000000..4ebc381
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ExecutionEnvironmentITCase.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.test.javaApiOperators;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.RichMapPartitionFunction;
+import org.apache.flink.api.common.io.GenericInputFormat;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+
+/**
+ * Test ExecutionEnvironment from user perspective
+ */
+@RunWith(Parameterized.class)
+public class ExecutionEnvironmentITCase extends MultipleProgramsTestBase {
+ private static final int PARALLELISM = 5;
+
+ public ExecutionEnvironmentITCase(TestExecutionMode mode) {
+ super(mode);
+ }
+
+ @Parameterized.Parameters(name = "Execution mode = {0}")
+ public static Collection<TestExecutionMode[]> executionModes(){
+ Collection<TestExecutionMode[]> c = new ArrayList<TestExecutionMode[]>(1);
+ c.add(new TestExecutionMode[] {TestExecutionMode.CLUSTER});
+ return c;
+ }
+
+
+ /**
+ * Ensure that the user can pass a custom configuration object to the LocalEnvironment
+ */
+ @Test
+ public void testLocalEnvironmentWithConfig() throws Exception {
+ Configuration conf = new Configuration();
+ conf.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM);
+ final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(conf);
+ env.setParallelism(ExecutionConfig.PARALLELISM_AUTO_MAX);
+
+ DataSet<Integer> result = env.createInput(new ParallelismDependentInputFormat())
+ .rebalance()
+ .mapPartition(new RichMapPartitionFunction<Integer, Integer>() {
+ @Override
+ public void mapPartition(Iterable<Integer> values, Collector<Integer> out) throws Exception {
+ out.collect(getRuntimeContext().getIndexOfThisSubtask());
+ }
+ });
+ List<Integer> resultCollection = new ArrayList<Integer>();
+ result.output(new LocalCollectionOutputFormat<Integer>(resultCollection));
+ env.execute();
+ assertEquals(PARALLELISM, resultCollection.size());
+ }
+
+ private static class ParallelismDependentInputFormat extends GenericInputFormat<Integer> {
+
+ private transient boolean emitted;
+
+ @Override
+ public GenericInputSplit[] createInputSplits(int numSplits) throws IOException {
+ assertEquals(PARALLELISM, numSplits);
+ return super.createInputSplits(numSplits);
+ }
+
+ @Override
+ public boolean reachedEnd() {
+ return emitted;
+ }
+
+ @Override
+ public Integer nextRecord(Integer reuse) {
+ if (emitted) {
+ return null;
+ }
+ emitted = true;
+ return 1;
+ }
+ }
+}
[2/2] flink git commit: [FLINK-1753] [streaming] Added test for Kafka
connector with tuple type
Posted by rm...@apache.org.
[FLINK-1753] [streaming] Added test for Kafka connector with tuple type
This closes #557
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/359b39c3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/359b39c3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/359b39c3
Branch: refs/heads/master
Commit: 359b39c383347b14c1a3e382d59717ac1be1b222
Parents: 7cf9586
Author: Gábor Hermann <re...@gmail.com>
Authored: Wed Apr 1 14:51:04 2015 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Thu Apr 2 09:21:23 2015 +0200
----------------------------------------------------------------------
.../connectors/kafka/api/KafkaSink.java | 9 +-
.../connectors/kafka/api/KafkaSource.java | 20 +
.../kafka/api/config/PartitionerWrapper.java | 2 +-
.../streaming/connectors/kafka/KafkaITCase.java | 428 ++++++++++++++++---
4 files changed, 406 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/359b39c3/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
index f1dbc8c..0bbf9a7 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
@@ -34,7 +34,6 @@ import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.serializer.DefaultEncoder;
-import kafka.serializer.StringEncoder;
/**
* Sink that emits its inputs to a Kafka topic.
@@ -123,7 +122,9 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
props.put("request.required.acks", "1");
props.put("serializer.class", DefaultEncoder.class.getCanonicalName());
- props.put("key.serializer.class", StringEncoder.class.getCanonicalName());
+
+ // this will not be used as the key will not be serialized
+ props.put("key.serializer.class", DefaultEncoder.class.getCanonicalName());
if (partitioner != null) {
props.put("partitioner.class", PartitionerWrapper.class.getCanonicalName());
@@ -152,7 +153,9 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
@Override
public void invoke(IN next) {
byte[] serialized = schema.serialize(next);
- producer.send(new KeyedMessage<IN, byte[]>(topicId, next, serialized));
+
+ // Sending message without serializable key.
+ producer.send(new KeyedMessage<IN, byte[]>(topicId, null, next, serialized));
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/359b39c3/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
index 4a6da3b..a0805c0 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
@@ -66,6 +66,26 @@ public class KafkaSource<OUT> extends ConnectorSource<OUT> {
/**
* Creates a KafkaSource that consumes a topic.
+ *
+ * @param zookeeperAddress
+ * Address of the Zookeeper host (with port number).
+ * @param topicId
+ * ID of the Kafka topic.
+ * @param groupId
+ * ID of the consumer group.
+ * @param deserializationSchema
+ * User defined deserialization schema.
+ * @param zookeeperSyncTimeMillis
+ * Synchronization time with zookeeper.
+ */
+ public KafkaSource(String zookeeperAddress,
+ String topicId, String groupId,
+ DeserializationSchema<OUT> deserializationSchema,
+ long zookeeperSyncTimeMillis) {
+ this(zookeeperAddress, topicId, groupId, deserializationSchema, zookeeperSyncTimeMillis, null);
+ }
+ /**
+ * Creates a KafkaSource that consumes a topic.
*
* @param zookeeperAddress
* Address of the Zookeeper host (with port number).
http://git-wip-us.apache.org/repos/asf/flink/blob/359b39c3/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/config/PartitionerWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/config/PartitionerWrapper.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/config/PartitionerWrapper.java
index f9dd21f..7ae17df 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/config/PartitionerWrapper.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/config/PartitionerWrapper.java
@@ -32,7 +32,7 @@ import kafka.utils.VerifiableProperties;
*
* This PartitionerWrapper is called with the Properties. From there, we extract the wrapped Partitioner instance.
*
- * The serialziable PartitionerWrapper is serialized into the Properties Hashmap and also deserialized from there.
+ * The serializable PartitionerWrapper is serialized into the Properties Hashmap and also deserialized from there.
*/
public class PartitionerWrapper implements Partitioner {
public final static String SERIALIZED_WRAPPER_NAME = "flink.kafka.wrapper.serialized";
http://git-wip-us.apache.org/repos/asf/flink/blob/359b39c3/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
index 95609f9..9344722 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
@@ -17,6 +17,9 @@
package org.apache.flink.streaming.connectors.kafka;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
@@ -24,7 +27,9 @@ import java.net.UnknownHostException;
import java.util.BitSet;
import java.util.Properties;
+import org.apache.commons.lang.SerializationUtils;
import org.apache.curator.test.TestingServer;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.net.NetUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -33,14 +38,19 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
import org.apache.flink.streaming.api.function.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.api.KafkaSink;
+import org.apache.flink.streaming.connectors.kafka.api.KafkaSource;
import org.apache.flink.streaming.connectors.kafka.api.simple.KafkaTopicUtils;
import org.apache.flink.streaming.connectors.kafka.api.simple.PersistentKafkaSource;
import org.apache.flink.streaming.connectors.kafka.api.simple.offset.Offset;
+import org.apache.flink.streaming.connectors.kafka.partitioner.SerializableKafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.util.Collector;
+import org.junit.AfterClass;
import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
@@ -61,74 +71,390 @@ public class KafkaITCase {
private static final Logger LOG = LoggerFactory.getLogger(KafkaITCase.class);
- private final String TOPIC = "myTopic";
+ private static int zkPort;
+ private static int kafkaPort;
+ private static String kafkaHost;
+
+ private static String zookeeperConnectionString;
- private int zkPort;
- private int kafkaPort;
- private String kafkaHost;
+ @ClassRule
+ public static TemporaryFolder tempFolder = new TemporaryFolder();
+ public static File tmpZkDir;
+ public static File tmpKafkaDir;
- private String zookeeperConnectionString;
+ private static TestingServer zookeeper;
+ private static KafkaServer broker1;
- @Rule
- public TemporaryFolder tempFolder = new TemporaryFolder();
- public File tmpZkDir;
- public File tmpKafkaDir;
- @Before
- public void prepare() throws IOException {
+ @BeforeClass
+ public static void prepare() throws IOException {
+ LOG.info("Starting KafkaITCase.prepare()");
tmpZkDir = tempFolder.newFolder();
tmpKafkaDir = tempFolder.newFolder();
kafkaHost = InetAddress.getLocalHost().getHostName();
zkPort = NetUtils.getAvailablePort();
kafkaPort = NetUtils.getAvailablePort();
zookeeperConnectionString = "localhost:" + zkPort;
- }
- @Test
- public void test() {
- LOG.info("Starting KafkaITCase.test()");
- TestingServer zookeeper = null;
- KafkaServer broker1 = null;
+ zookeeper = null;
+ broker1 = null;
+
try {
+ LOG.info("Starting Zookeeper");
zookeeper = getZookeeper();
+ LOG.info("Starting KafkaServer");
broker1 = getKafkaServer(0);
- LOG.info("ZK and KafkaServer started. Creating test topic:");
- createTestTopic();
-
- LOG.info("Starting Kafka Topology in Flink:");
- startKafkaTopology();
-
- LOG.info("Test succeeded.");
+ LOG.info("ZK and KafkaServer started.");
} catch (Throwable t) {
LOG.warn("Test failed with exception", t);
Assert.fail("Test failed with: " + t.getMessage());
- } finally {
- LOG.info("Shutting down all services");
- if (broker1 != null) {
- broker1.shutdown();
+ }
+ }
+
+ @AfterClass
+ public static void shutDownServices() {
+ LOG.info("Shutting down all services");
+ if (broker1 != null) {
+ broker1.shutdown();
+ }
+ if (zookeeper != null) {
+ try {
+ zookeeper.stop();
+ } catch (IOException e) {
+ LOG.warn("ZK.stop() failed", e);
+ }
+ }
+ }
+
+ @Test
+ public void regularKafkaSourceTest() throws Exception {
+ LOG.info("Starting KafkaITCase.regularKafkaSourceTest()");
+
+ String topic = "regularKafkaSourceTestTopic";
+ createTestTopic(topic, 1);
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+
+ // add consuming topology:
+ DataStreamSource<Tuple2<Long, String>> consuming = env.addSource(
+ new KafkaSource<Tuple2<Long, String>>(zookeeperConnectionString, topic, "myFlinkGroup", new TupleSerializationSchema(), 5000));
+ consuming.addSink(new SinkFunction<Tuple2<Long, String>>() {
+ int elCnt = 0;
+ int start = -1;
+ BitSet validator = new BitSet(101);
+
+ @Override
+ public void invoke(Tuple2<Long, String> value) throws Exception {
+ LOG.info("Got " + value);
+ String[] sp = value.f1.split("-");
+ int v = Integer.parseInt(sp[1]);
+
+ assertEquals(value.f0 - 1000, (long) v);
+
+ if (start == -1) {
+ start = v;
+ }
+ Assert.assertFalse("Received tuple twice", validator.get(v - start));
+ validator.set(v - start);
+ elCnt++;
+ if (elCnt == 100) {
+ // check if everything in the bitset is set to true
+ int nc;
+ if ((nc = validator.nextClearBit(0)) != 100) {
+ throw new RuntimeException("The bitset was not set to 1 on all elements. Next clear:" + nc + " Set: " + validator);
+ }
+ throw new SuccessException();
+ }
}
- if (zookeeper != null) {
- try {
- zookeeper.stop();
- } catch (IOException e) {
- LOG.warn("ZK.stop() failed", e);
+ });
+
+ // add producing topology
+ DataStream<Tuple2<Long, String>> stream = env.addSource(new SourceFunction<Tuple2<Long, String>>() {
+ private static final long serialVersionUID = 1L;
+ boolean running = true;
+
+ @Override
+ public void run(Collector<Tuple2<Long, String>> collector) throws Exception {
+ LOG.info("Starting source.");
+ int cnt = 0;
+ while (running) {
+ collector.collect(new Tuple2<Long, String>(1000L + cnt, "kafka-" + cnt++));
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException ignored) {
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ LOG.info("Source got cancel()");
+ running = false;
+ }
+ });
+ stream.addSink(new KafkaSink<Tuple2<Long, String>>(zookeeperConnectionString, topic, new TupleSerializationSchema()));
+
+ try {
+ env.setParallelism(1);
+ env.execute();
+ } catch (JobExecutionException good) {
+ Throwable t = good.getCause();
+ int limit = 0;
+ while (!(t instanceof SuccessException)) {
+ t = t.getCause();
+ if (limit++ == 20) {
+ LOG.warn("Test failed with exception", good);
+ Assert.fail("Test failed with: " + good.getMessage());
}
}
}
+ LOG.info("Finished KafkaITCase.regularKafkaSourceTest()");
}
- private void createTestTopic() {
- KafkaTopicUtils kafkaTopicUtils = new KafkaTopicUtils(zookeeperConnectionString);
- kafkaTopicUtils.createTopic(TOPIC, 1, 1);
+ @Test
+ public void tupleTestTopology() throws Exception {
+ LOG.info("Starting KafkaITCase.tupleTestTopology()");
+
+ String topic = "tupleTestTopic";
+ createTestTopic(topic, 1);
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+
+ // add consuming topology:
+ DataStreamSource<Tuple2<Long, String>> consuming = env.addSource(
+ new PersistentKafkaSource<Tuple2<Long, String>>(zookeeperConnectionString, topic, new TupleSerializationSchema(), 5000, 100, Offset.FROM_BEGINNING));
+ consuming.addSink(new SinkFunction<Tuple2<Long, String>>() {
+ int elCnt = 0;
+ int start = -1;
+ BitSet validator = new BitSet(101);
+
+ @Override
+ public void invoke(Tuple2<Long, String> value) throws Exception {
+ LOG.info("Got " + value);
+ String[] sp = value.f1.split("-");
+ int v = Integer.parseInt(sp[1]);
+
+ assertEquals(value.f0 - 1000, (long) v);
+
+ if (start == -1) {
+ start = v;
+ }
+ Assert.assertFalse("Received tuple twice", validator.get(v - start));
+ validator.set(v - start);
+ elCnt++;
+ if (elCnt == 100) {
+ // check if everything in the bitset is set to true
+ int nc;
+ if ((nc = validator.nextClearBit(0)) != 100) {
+ throw new RuntimeException("The bitset was not set to 1 on all elements. Next clear:" + nc + " Set: " + validator);
+ }
+ throw new SuccessException();
+ }
+ }
+ });
+
+ // add producing topology
+ DataStream<Tuple2<Long, String>> stream = env.addSource(new SourceFunction<Tuple2<Long, String>>() {
+ private static final long serialVersionUID = 1L;
+ boolean running = true;
+
+ @Override
+ public void run(Collector<Tuple2<Long, String>> collector) throws Exception {
+ LOG.info("Starting source.");
+ int cnt = 0;
+ while (running) {
+ collector.collect(new Tuple2<Long, String>(1000L + cnt, "kafka-" + cnt++));
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException ignored) {
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ LOG.info("Source got cancel()");
+ running = false;
+ }
+ });
+ stream.addSink(new KafkaSink<Tuple2<Long, String>>(zookeeperConnectionString, topic, new TupleSerializationSchema()));
+
+ try {
+ env.setParallelism(1);
+ env.execute();
+ } catch (JobExecutionException good) {
+ Throwable t = good.getCause();
+ int limit = 0;
+ while (!(t instanceof SuccessException)) {
+ t = t.getCause();
+ if (limit++ == 20) {
+ LOG.warn("Test failed with exception", good);
+ Assert.fail("Test failed with: " + good.getMessage());
+ }
+ }
+ }
+
+ LOG.info("Finished KafkaITCase.tupleTestTopology()");
}
- private void startKafkaTopology() throws Exception {
+ private static boolean partitionerHasBeenCalled = false;
+
+ @Test
+ public void customPartitioningTestTopology() throws Exception {
+ LOG.info("Starting KafkaITCase.customPartitioningTestTopology()");
+
+ String topic = "customPartitioningTestTopic";
+
+ createTestTopic(topic, 3);
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+
+ // add consuming topology:
+ DataStreamSource<Tuple2<Long, String>> consuming = env.addSource(
+ new PersistentKafkaSource<Tuple2<Long, String>>(zookeeperConnectionString, topic, new TupleSerializationSchema(), 5000, 100, Offset.FROM_BEGINNING));
+ consuming.addSink(new SinkFunction<Tuple2<Long, String>>() {
+ int start = -1;
+ BitSet validator = new BitSet(101);
+
+ boolean gotPartition1 = false;
+ boolean gotPartition2 = false;
+ boolean gotPartition3 = false;
+
+ @Override
+ public void invoke(Tuple2<Long, String> value) throws Exception {
+ LOG.info("Got " + value);
+ String[] sp = value.f1.split("-");
+ int v = Integer.parseInt(sp[1]);
+
+ assertEquals(value.f0 - 1000, (long) v);
+
+ switch (v) {
+ case 9:
+ gotPartition1 = true;
+ break;
+ case 19:
+ gotPartition2 = true;
+ break;
+ case 99:
+ gotPartition3 = true;
+ break;
+ }
+
+ if (start == -1) {
+ start = v;
+ }
+ Assert.assertFalse("Received tuple twice", validator.get(v - start));
+ validator.set(v - start);
+
+ if (gotPartition1 && gotPartition2 && gotPartition3) {
+ // check if everything in the bitset is set to true
+ int nc;
+ if ((nc = validator.nextClearBit(0)) != 100) {
+ throw new RuntimeException("The bitset was not set to 1 on all elements. Next clear:" + nc + " Set: " + validator);
+ }
+ throw new SuccessException();
+ }
+ }
+ });
+
+ // add producing topology
+ DataStream<Tuple2<Long, String>> stream = env.addSource(new SourceFunction<Tuple2<Long, String>>() {
+ private static final long serialVersionUID = 1L;
+ boolean running = true;
+
+ @Override
+ public void run(Collector<Tuple2<Long, String>> collector) throws Exception {
+ LOG.info("Starting source.");
+ int cnt = 0;
+ while (running) {
+ collector.collect(new Tuple2<Long, String>(1000L + cnt, "kafka-" + cnt++));
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException ignored) {
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ LOG.info("Source got cancel()");
+ running = false;
+ }
+ });
+ stream.addSink(new KafkaSink<Tuple2<Long, String>>(zookeeperConnectionString, topic, new TupleSerializationSchema(), new CustomPartitioner()));
+
+ try {
+ env.setParallelism(1);
+ env.execute();
+ } catch (JobExecutionException good) {
+ Throwable t = good.getCause();
+ int limit = 0;
+ while (!(t instanceof SuccessException)) {
+ t = t.getCause();
+ if (limit++ == 20) {
+ throw good;
+ }
+ }
+
+ assertTrue(partitionerHasBeenCalled);
+ }
+
+ LOG.info("Finished KafkaITCase.customPartitioningTestTopology()");
+ }
+
+ /**
+ * This is for a topic with 3 partitions and Tuple2<Long, String>
+ */
+ private static class CustomPartitioner implements SerializableKafkaPartitioner {
+
+ @Override
+ public int partition(Object key, int numPartitions) {
+ partitionerHasBeenCalled = true;
+
+ Tuple2<Long, String> tuple = (Tuple2<Long, String>) key;
+ if (tuple.f0 < 10) {
+ return 0;
+ } else if (tuple.f0 < 20) {
+ return 1;
+ } else {
+ return 2;
+ }
+ }
+ }
+
+ private static class TupleSerializationSchema implements DeserializationSchema<Tuple2<Long, String>>, SerializationSchema<Tuple2<Long, String>, byte[]> {
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Tuple2<Long, String> deserialize(byte[] message) {
+ Object deserializedObject = SerializationUtils.deserialize(message);
+ return (Tuple2<Long, String>) deserializedObject;
+ }
+
+ @Override
+ public byte[] serialize(Tuple2<Long, String> element) {
+ return SerializationUtils.serialize(element);
+ }
+
+ @Override
+ public boolean isEndOfStream(Tuple2<Long, String> nextElement) {
+ return false;
+ }
+
+ }
+
+ @Test
+ public void simpleTestTopology() throws Exception {
+ String topic = "simpleTestTopic";
+
+ createTestTopic(topic, 1);
+
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
// add consuming topology:
DataStreamSource<String> consuming = env.addSource(
- new PersistentKafkaSource<String>(zookeeperConnectionString, TOPIC, new JavaDefaultStringSchema(), 5000, 100, Offset.FROM_BEGINNING));
+ new PersistentKafkaSource<String>(zookeeperConnectionString, topic, new JavaDefaultStringSchema(), 5000, 100, Offset.FROM_BEGINNING));
consuming.addSink(new SinkFunction<String>() {
int elCnt = 0;
int start = -1;
@@ -176,11 +502,11 @@ public class KafkaITCase {
@Override
public void cancel() {
- LOG.info("Source got chancel()");
+ LOG.info("Source got cancel()");
running = false;
}
});
- stream.addSink(new KafkaSink<String>(zookeeperConnectionString, TOPIC, new JavaDefaultStringSchema()));
+ stream.addSink(new KafkaSink<String>(zookeeperConnectionString, topic, new JavaDefaultStringSchema()));
try {
env.setParallelism(1);
@@ -191,21 +517,28 @@ public class KafkaITCase {
while (!(t instanceof SuccessException)) {
t = t.getCause();
if (limit++ == 20) {
- throw good;
+ LOG.warn("Test failed with exception", good);
+ Assert.fail("Test failed with: " + good.getMessage());
}
}
}
}
- private TestingServer getZookeeper() throws Exception {
+ private void createTestTopic(String topic, int numberOfPartitions) {
+ KafkaTopicUtils kafkaTopicUtils = new KafkaTopicUtils(zookeeperConnectionString);
+ kafkaTopicUtils.createTopic(topic, numberOfPartitions, 1);
+ }
+
+
+ private static TestingServer getZookeeper() throws Exception {
return new TestingServer(zkPort, tmpZkDir);
}
/**
* Copied from com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed)
*/
- private KafkaServer getKafkaServer(int brokerId) throws UnknownHostException {
+ private static KafkaServer getKafkaServer(int brokerId) throws UnknownHostException {
Properties kafkaProperties = new Properties();
// properties have to be Strings
kafkaProperties.put("advertised.host.name", kafkaHost);
@@ -220,13 +553,12 @@ public class KafkaITCase {
return server;
}
- public class LocalSystemTime implements Time {
+ public static class LocalSystemTime implements Time {
@Override
public long milliseconds() {
return System.currentTimeMillis();
}
-
public long nanoseconds() {
return System.nanoTime();
}
@@ -243,9 +575,7 @@ public class KafkaITCase {
}
public static class SuccessException extends Exception {
-
private static final long serialVersionUID = 1L;
-
}
}