You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/05/29 15:46:05 UTC
[1/6] flink git commit: [FLINK-2112] [streaming] Proper package for
kafka tests
Repository: flink
Updated Branches:
refs/heads/master 84fce54b6 -> 0930179f4
[FLINK-2112] [streaming] Proper package for kafka tests
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0fd60233
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0fd60233
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0fd60233
Branch: refs/heads/master
Commit: 0fd60233bbdf55139de8bc36c16ff8fa33bdc995
Parents: 145b2ba
Author: mbalassi <mb...@apache.org>
Authored: Fri May 29 14:17:27 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Fri May 29 15:08:32 2015 +0200
----------------------------------------------------------------------
.../streaming/connectors/kafka/KafkaITCase.java | 1023 ++++++++++++++++++
.../kafka/util/KafkaLocalSystemTime.java | 48 +
.../flink/streaming/kafka/KafkaITCase.java | 1023 ------------------
.../kafka/util/KafkaLocalSystemTime.java | 48 -
4 files changed, 1071 insertions(+), 1071 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0fd60233/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
new file mode 100644
index 0000000..7b7bdcc
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
@@ -0,0 +1,1023 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.UUID;
+
+import kafka.admin.AdminUtils;
+import kafka.api.PartitionMetadata;
+import kafka.consumer.Consumer;
+import kafka.consumer.ConsumerConfig;
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.consumer.ConsumerConnector;
+import kafka.message.MessageAndMetadata;
+import kafka.network.SocketServer;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.curator.test.TestingServer;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.net.NetUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kafka.api.KafkaSink;
+import org.apache.flink.streaming.connectors.kafka.api.KafkaSource;
+import org.apache.flink.streaming.connectors.kafka.api.persistent.PersistentKafkaSource;
+import org.apache.flink.streaming.connectors.kafka.partitioner.SerializableKafkaPartitioner;
+import org.apache.flink.streaming.connectors.kafka.util.KafkaLocalSystemTime;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.StringUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import scala.collection.Seq;
+
+/**
+ * Code in this test is based on the following GitHub repository:
+ * (as per commit bc6b2b2d5f6424d5f377aa6c0871e82a956462ef)
+ * <p/>
+ * https://github.com/sakserv/hadoop-mini-clusters (ASL licensed)
+ */
+
+public class KafkaITCase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaITCase.class);
+ private static final int NUMBER_OF_KAFKA_SERVERS = 3;
+
+ private static int zkPort;
+ private static String kafkaHost;
+
+ private static String zookeeperConnectionString;
+
+ @ClassRule
+ public static TemporaryFolder tempFolder = new TemporaryFolder();
+ public static File tmpZkDir;
+ public static List<File> tmpKafkaDirs;
+
+ private static TestingServer zookeeper;
+ private static List<KafkaServer> brokers;
+ private static String brokerConnectionStrings = "";
+
+ private static ConsumerConfig standardCC;
+
+ private static ZkClient zkClient;
+
+
+ @BeforeClass
+ public static void prepare() throws IOException {
+ LOG.info("Starting KafkaITCase.prepare()");
+ tmpZkDir = tempFolder.newFolder();
+
+ tmpKafkaDirs = new ArrayList<File>(NUMBER_OF_KAFKA_SERVERS);
+ for (int i = 0; i < NUMBER_OF_KAFKA_SERVERS; i++) {
+ tmpKafkaDirs.add(tempFolder.newFolder());
+ }
+
+ kafkaHost = InetAddress.getLocalHost().getHostName();
+ zkPort = NetUtils.getAvailablePort();
+ zookeeperConnectionString = "localhost:" + zkPort;
+
+ zookeeper = null;
+ brokers = null;
+
+ try {
+ LOG.info("Starting Zookeeper");
+ zookeeper = getZookeeper();
+ LOG.info("Starting KafkaServer");
+ brokers = new ArrayList<KafkaServer>(NUMBER_OF_KAFKA_SERVERS);
+ for (int i = 0; i < NUMBER_OF_KAFKA_SERVERS; i++) {
+ brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i)));
+ SocketServer socketServer = brokers.get(i).socketServer();
+ String host = "localhost";
+ if(socketServer.host() != null) {
+ host = socketServer.host();
+ }
+ brokerConnectionStrings += host+":"+socketServer.port()+",";
+ }
+
+ LOG.info("ZK and KafkaServer started.");
+ } catch (Throwable t) {
+ LOG.warn("Test failed with exception", t);
+ Assert.fail("Test failed with: " + t.getMessage());
+ }
+
+ Properties cProps = new Properties();
+ cProps.setProperty("zookeeper.connect", zookeeperConnectionString);
+ cProps.setProperty("group.id", "flink-tests");
+ cProps.setProperty("auto.commit.enable", "false");
+
+ cProps.setProperty("auto.offset.reset", "smallest"); // read from the beginning.
+
+ standardCC = new ConsumerConfig(cProps);
+
+ zkClient = new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(), standardCC.zkConnectionTimeoutMs(), new PersistentKafkaSource.KafkaZKStringSerializer());
+ }
+
+ @AfterClass
+ public static void shutDownServices() {
+ LOG.info("Shutting down all services");
+ for (KafkaServer broker : brokers) {
+ if (broker != null) {
+ broker.shutdown();
+ }
+ }
+ if (zookeeper != null) {
+ try {
+ zookeeper.stop();
+ } catch (IOException e) {
+ LOG.warn("ZK.stop() failed", e);
+ }
+ }
+ zkClient.close();
+ }
+
+
+ @Test
+ public void testOffsetManipulation() {
+ ZkClient zk = new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(), standardCC.zkConnectionTimeoutMs(), new PersistentKafkaSource.KafkaZKStringSerializer());
+
+ final String topicName = "testOffsetManipulation";
+
+ // create topic
+ Properties topicConfig = new Properties();
+ LOG.info("Creating topic {}", topicName);
+ AdminUtils.createTopic(zk, topicName, 3, 2, topicConfig);
+
+ PersistentKafkaSource.setOffset(zk, standardCC.groupId(), topicName, 0, 1337);
+
+ Assert.assertEquals(1337L, PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 0));
+
+ zk.close();
+ }
+ /**
+ * We want to use the High level java consumer API but manage the offset in Zookeeper manually.
+ *
+ */
+ @Test
+ public void testPersistentSourceWithOffsetUpdates() throws Exception {
+ LOG.info("Starting testPersistentSourceWithOffsetUpdates()");
+
+ ZkClient zk = new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(), standardCC.zkConnectionTimeoutMs(), new PersistentKafkaSource.KafkaZKStringSerializer());
+
+ final String topicName = "testOffsetHacking";
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(3);
+ env.getConfig().disableSysoutLogging();
+ env.enableCheckpointing(50);
+ env.setNumberOfExecutionRetries(0);
+
+ // create topic
+ Properties topicConfig = new Properties();
+ LOG.info("Creating topic {}", topicName);
+ AdminUtils.createTopic(zk, topicName, 3, 2, topicConfig);
+
+ // write a sequence from 0 to 99 to each of the three partitions.
+ writeSequence(env, topicName, 0, 99);
+
+
+ readSequence(env, standardCC, topicName, 0, 100, 300);
+
+ // check offsets to be set at least higher than 50.
+ // correctly, we would expect them to be set to 99, but right now there is no way of stopping a topology once all pending
+ // checkpoints have been committed.
+ // To work around that limitation, the persistent kafka consumer is throtteled with a thread.sleep().
+ long o1 = PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 0);
+ long o2 = PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 1);
+ long o3 = PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 2);
+ Assert.assertTrue("The offset seems incorrect, got "+o1, o1 > 50L);
+ Assert.assertTrue("The offset seems incorrect, got "+o2, o2 > 50L);
+ Assert.assertTrue("The offset seems incorrect, got "+o3, o3 > 50L);
+ /** Once we have proper shutdown of streaming jobs, enable these tests
+ Assert.assertEquals("The offset seems incorrect", 99L, PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 0));
+ Assert.assertEquals("The offset seems incorrect", 99L, PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 1));
+ Assert.assertEquals("The offset seems incorrect", 99L, PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 2));*/
+
+
+ LOG.info("Manipulating offsets");
+ // set the offset to 25, 50, and 75 for the three partitions
+ PersistentKafkaSource.setOffset(zk, standardCC.groupId(), topicName, 0, 50);
+ PersistentKafkaSource.setOffset(zk, standardCC.groupId(), topicName, 1, 50);
+ PersistentKafkaSource.setOffset(zk, standardCC.groupId(), topicName, 2, 50);
+
+ // create new env
+ env = StreamExecutionEnvironment.createLocalEnvironment(3);
+ env.getConfig().disableSysoutLogging();
+ readSequence(env, standardCC, topicName, 50, 50, 150);
+
+ zk.close();
+
+ LOG.info("Finished testPersistentSourceWithOffsetUpdates()");
+ }
+
+ private void readSequence(StreamExecutionEnvironment env, ConsumerConfig cc, final String topicName, final int valuesStartFrom, final int valuesCount, final int finalCount) throws Exception {
+ LOG.info("Reading sequence for verification until final count {}", finalCount);
+ DataStream<Tuple2<Integer, Integer>> source = env.addSource(
+ new PersistentKafkaSource<Tuple2<Integer, Integer>>(topicName, new Utils.TypeInformationSerializationSchema<Tuple2<Integer, Integer>>(new Tuple2<Integer, Integer>(1,1), env.getConfig()), cc)
+ )
+ //add a sleeper mapper. Since there is no good way of "shutting down" a running topology, we have
+ // to play this trick. The problem is that we have to wait until all checkpoints are confirmed
+ .map(new MapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
+ @Override
+ public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> value) throws Exception {
+ Thread.sleep(150);
+ return value;
+ }
+ }).setParallelism(3);
+
+ // verify data
+ DataStream<Integer> validIndexes = source.flatMap(new RichFlatMapFunction<Tuple2<Integer, Integer>, Integer>() {
+ int[] values = new int[valuesCount];
+ int count = 0;
+
+ @Override
+ public void flatMap(Tuple2<Integer, Integer> value, Collector<Integer> out) throws Exception {
+ values[value.f1 - valuesStartFrom]++;
+ count++;
+
+ LOG.info("Reader " + getRuntimeContext().getIndexOfThisSubtask() + " got " + value + " count=" + count + "/" + finalCount);
+ // verify if we've seen everything
+
+ if (count == finalCount) {
+ LOG.info("Received all values");
+ for (int i = 0; i < values.length; i++) {
+ int v = values[i];
+ if (v != 3) {
+ LOG.warn("Test is going to fail");
+ printTopic(topicName, valuesCount, this.getRuntimeContext().getExecutionConfig());
+ throw new RuntimeException("Expected v to be 3, but was " + v + " on element " + i + " array=" + Arrays.toString(values));
+ }
+ }
+ // test has passed
+ throw new SuccessException();
+ }
+ }
+
+ }).setParallelism(1);
+
+ tryExecute(env, "Read data from Kafka");
+
+ LOG.info("Successfully read sequence for verification");
+ }
+
+
+
+ private void writeSequence(StreamExecutionEnvironment env, String topicName, final int from, final int to) throws Exception {
+ LOG.info("Writing sequence from {} to {} to topic {}", from, to, topicName);
+
+ DataStream<Tuple2<Integer, Integer>> stream = env.addSource(new RichParallelSourceFunction<Tuple2<Integer, Integer>>() {
+ private static final long serialVersionUID = 1L;
+ int cnt = from;
+ int partition;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ partition = getRuntimeContext().getIndexOfThisSubtask();
+
+ }
+
+ @Override
+ public boolean reachedEnd() throws Exception {
+ return cnt > to;
+ }
+
+ @Override
+ public Tuple2<Integer, Integer> next() throws Exception {
+ LOG.info("Writing " + cnt + " to partition " + partition);
+ Tuple2<Integer, Integer> result = new Tuple2<Integer, Integer>(getRuntimeContext().getIndexOfThisSubtask(), cnt);
+ cnt++;
+ return result;
+ }
+ }).setParallelism(3);
+
+ stream.addSink(new KafkaSink<Tuple2<Integer, Integer>>(brokerConnectionStrings,
+ topicName,
+ new Utils.TypeInformationSerializationSchema<Tuple2<Integer, Integer>>(new Tuple2<Integer, Integer>(1, 1), env.getConfig()),
+ new T2Partitioner()
+ )).setParallelism(3);
+ env.execute("Write sequence from " + from + " to " + to + " to topic " + topicName);
+ LOG.info("Finished writing sequence");
+ }
+
+ private static class T2Partitioner implements SerializableKafkaPartitioner {
+ @Override
+ public int partition(Object key, int numPartitions) {
+ if(numPartitions != 3) {
+ throw new IllegalArgumentException("Expected three partitions");
+ }
+ Tuple2<Integer, Integer> element = (Tuple2<Integer, Integer>) key;
+ return element.f0;
+ }
+ }
+
+
+ @Test
+ public void regularKafkaSourceTest() throws Exception {
+ LOG.info("Starting KafkaITCase.regularKafkaSourceTest()");
+
+ String topic = "regularKafkaSourceTestTopic";
+ createTestTopic(topic, 1, 1);
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+ // add consuming topology:
+ DataStreamSource<Tuple2<Long, String>> consuming = env.addSource(
+ new KafkaSource<Tuple2<Long, String>>(zookeeperConnectionString, topic, "myFlinkGroup", new Utils.TypeInformationSerializationSchema<Tuple2<Long, String>>(new Tuple2<Long, String>(1L, ""), env.getConfig()), 5000));
+ consuming.addSink(new SinkFunction<Tuple2<Long, String>>() {
+ int elCnt = 0;
+ int start = -1;
+ BitSet validator = new BitSet(101);
+
+ @Override
+ public void invoke(Tuple2<Long, String> value) throws Exception {
+ LOG.debug("Got value = " + value);
+ String[] sp = value.f1.split("-");
+ int v = Integer.parseInt(sp[1]);
+
+ assertEquals(value.f0 - 1000, (long) v);
+
+ if (start == -1) {
+ start = v;
+ }
+ Assert.assertFalse("Received tuple twice", validator.get(v - start));
+ validator.set(v - start);
+ elCnt++;
+ if (elCnt == 100) {
+ // check if everything in the bitset is set to true
+ int nc;
+ if ((nc = validator.nextClearBit(0)) != 100) {
+ throw new RuntimeException("The bitset was not set to 1 on all elements. Next clear:" + nc + " Set: " + validator);
+ }
+ throw new SuccessException();
+ }
+ }
+ });
+
+ // add producing topology
+ DataStream<Tuple2<Long, String>> stream = env.addSource(new SourceFunction<Tuple2<Long, String>>() {
+ private static final long serialVersionUID = 1L;
+ int cnt = 0;
+
+ @Override
+ public boolean reachedEnd() throws Exception {
+ return false;
+ }
+
+ @Override
+ public Tuple2<Long, String> next() throws Exception {
+ Thread.sleep(100);
+ return new Tuple2<Long, String>(1000L + cnt, "kafka-" + cnt++);
+ }
+ });
+ stream.addSink(new KafkaSink<Tuple2<Long, String>>(brokerConnectionStrings, topic, new Utils.TypeInformationSerializationSchema<Tuple2<Long, String>>(new Tuple2<Long, String>(1L, ""), env.getConfig())));
+
+ tryExecute(env, "regular kafka source test");
+
+ LOG.info("Finished KafkaITCase.regularKafkaSourceTest()");
+ }
+
+ @Test
+ public void tupleTestTopology() throws Exception {
+ LOG.info("Starting KafkaITCase.tupleTestTopology()");
+
+ String topic = "tupleTestTopic";
+ createTestTopic(topic, 1, 1);
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+
+ // add consuming topology:
+ DataStreamSource<Tuple2<Long, String>> consuming = env.addSource(
+ new PersistentKafkaSource<Tuple2<Long, String>>(topic,
+ new Utils.TypeInformationSerializationSchema<Tuple2<Long, String>>(new Tuple2<Long, String>(1L, ""), env.getConfig()),
+ standardCC
+ ));
+ consuming.addSink(new RichSinkFunction<Tuple2<Long, String>>() {
+ int elCnt = 0;
+ int start = -1;
+ BitSet validator = new BitSet(101);
+
+ @Override
+ public void invoke(Tuple2<Long, String> value) throws Exception {
+ LOG.info("Got value " + value);
+ String[] sp = value.f1.split("-");
+ int v = Integer.parseInt(sp[1]);
+
+ assertEquals(value.f0 - 1000, (long) v);
+
+ if (start == -1) {
+ start = v;
+ }
+ Assert.assertFalse("Received tuple twice", validator.get(v - start));
+ validator.set(v - start);
+ elCnt++;
+ if (elCnt == 100) {
+ // check if everything in the bitset is set to true
+ int nc;
+ if ((nc = validator.nextClearBit(0)) != 100) {
+ throw new RuntimeException("The bitset was not set to 1 on all elements. Next clear:" + nc + " Set: " + validator);
+ }
+ throw new SuccessException();
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ Assert.assertTrue("No element received", elCnt > 0);
+ }
+ });
+
+ // add producing topology
+ DataStream<Tuple2<Long, String>> stream = env.addSource(new SourceFunction<Tuple2<Long, String>>() {
+ private static final long serialVersionUID = 1L;
+ int cnt = 0;
+
+ @Override
+ public boolean reachedEnd() throws Exception {
+ return false;
+ }
+
+ @Override
+ public Tuple2<Long, String> next() throws Exception {
+ Thread.sleep(100);
+ return new Tuple2<Long, String>(1000L + cnt, "kafka-" + cnt++);
+ }
+ });
+ stream.addSink(new KafkaSink<Tuple2<Long, String>>(brokerConnectionStrings, topic, new Utils.TypeInformationSerializationSchema<Tuple2<Long, String>>(new Tuple2<Long, String>(1L, ""), env.getConfig())));
+
+ tryExecute(env, "tupletesttopology");
+
+ LOG.info("Finished KafkaITCase.tupleTestTopology()");
+ }
+
+ /**
+ * Test Flink's Kafka integration also with very big records (30MB)
+ *
+ * see http://stackoverflow.com/questions/21020347/kafka-sending-a-15mb-message
+ *
+ * @throws Exception
+ */
+ @Test
+ public void bigRecordTestTopology() throws Exception {
+
+ LOG.info("Starting KafkaITCase.bigRecordTestTopology()");
+
+ String topic = "bigRecordTestTopic";
+ createTestTopic(topic, 1, 1);
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+
+ // add consuming topology:
+ Utils.TypeInformationSerializationSchema<Tuple2<Long, byte[]>> serSchema = new Utils.TypeInformationSerializationSchema<Tuple2<Long, byte[]>>(new Tuple2<Long, byte[]>(0L, new byte[]{0}), env.getConfig());
+ Properties consumerProps = new Properties();
+ consumerProps.setProperty("fetch.message.max.bytes", Integer.toString(1024 * 1024 * 30));
+ consumerProps.setProperty("zookeeper.connect", zookeeperConnectionString);
+ consumerProps.setProperty("group.id", "test");
+ consumerProps.setProperty("auto.commit.enable", "false");
+ consumerProps.setProperty("auto.offset.reset", "smallest");
+
+ ConsumerConfig cc = new ConsumerConfig(consumerProps);
+ DataStreamSource<Tuple2<Long, byte[]>> consuming = env.addSource(
+ new PersistentKafkaSource<Tuple2<Long, byte[]>>(topic, serSchema, cc));
+
+ consuming.addSink(new SinkFunction<Tuple2<Long, byte[]>>() {
+ int elCnt = 0;
+
+ @Override
+ public void invoke(Tuple2<Long, byte[]> value) throws Exception {
+ LOG.info("Received {}", value.f0);
+ elCnt++;
+ if(value.f0 == -1) {
+ // we should have seen 11 elements now.
+ if(elCnt == 11) {
+ throw new SuccessException();
+ } else {
+ throw new RuntimeException("There have been "+elCnt+" elements");
+ }
+ }
+ if(elCnt > 10) {
+ throw new RuntimeException("More than 10 elements seen: "+elCnt);
+ }
+ }
+ }).setParallelism(1);
+
+ // add producing topology
+ DataStream<Tuple2<Long, byte[]>> stream = env.addSource(new RichSourceFunction<Tuple2<Long, byte[]>>() {
+ private static final long serialVersionUID = 1L;
+ boolean running = true;
+ long cnt;
+ transient Random rnd;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ cnt = 0;
+ rnd = new Random(1337);
+
+ }
+
+ @Override
+ public boolean reachedEnd() throws Exception {
+ return cnt > 10;
+ }
+
+ @Override
+ public Tuple2<Long, byte[]> next() throws Exception {
+ Thread.sleep(100);
+
+ if (cnt < 10) {
+ byte[] wl = new byte[Math.abs(rnd.nextInt(1024 * 1024 * 30))];
+ Tuple2<Long, byte[]> result = new Tuple2<Long, byte[]>(cnt++, wl);
+ LOG.info("Emitted cnt=" + (cnt - 1) + " with byte.length = " + wl.length);
+ return result;
+
+ } else if (cnt == 10) {
+ Tuple2<Long, byte[]> result = new Tuple2<Long, byte[]>(-1L, new byte[]{1});
+ cnt++;
+ return result;
+ } else {
+ throw new RuntimeException("Source is exhausted.");
+ }
+ }
+ });
+
+ stream.addSink(new KafkaSink<Tuple2<Long, byte[]>>(brokerConnectionStrings, topic,
+ new Utils.TypeInformationSerializationSchema<Tuple2<Long, byte[]>>(new Tuple2<Long, byte[]>(0L, new byte[]{0}), env.getConfig()))
+ );
+
+ tryExecute(env, "big topology test");
+
+ LOG.info("Finished KafkaITCase.bigRecordTestTopology()");
+ }
+
+
+ @Test
+ public void customPartitioningTestTopology() throws Exception {
+ LOG.info("Starting KafkaITCase.customPartitioningTestTopology()");
+
+ String topic = "customPartitioningTestTopic";
+
+ createTestTopic(topic, 3, 1);
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+
+ // add consuming topology:
+ DataStreamSource<Tuple2<Long, String>> consuming = env.addSource(
+ new PersistentKafkaSource<Tuple2<Long, String>>(topic,
+ new Utils.TypeInformationSerializationSchema<Tuple2<Long, String>>(new Tuple2<Long, String>(1L, ""), env.getConfig()),
+ standardCC));
+ consuming.addSink(new SinkFunction<Tuple2<Long, String>>() {
+ int start = -1;
+ BitSet validator = new BitSet(101);
+
+ boolean gotPartition1 = false;
+ boolean gotPartition2 = false;
+ boolean gotPartition3 = false;
+
+ @Override
+ public void invoke(Tuple2<Long, String> value) throws Exception {
+ LOG.debug("Got " + value);
+ String[] sp = value.f1.split("-");
+ int v = Integer.parseInt(sp[1]);
+
+ assertEquals(value.f0 - 1000, (long) v);
+
+ switch (v) {
+ case 9:
+ gotPartition1 = true;
+ break;
+ case 19:
+ gotPartition2 = true;
+ break;
+ case 99:
+ gotPartition3 = true;
+ break;
+ }
+
+ if (start == -1) {
+ start = v;
+ }
+ Assert.assertFalse("Received tuple twice", validator.get(v - start));
+ validator.set(v - start);
+
+ if (gotPartition1 && gotPartition2 && gotPartition3) {
+ // check if everything in the bitset is set to true
+ int nc;
+ if ((nc = validator.nextClearBit(0)) != 100) {
+ throw new RuntimeException("The bitset was not set to 1 on all elements. Next clear:" + nc + " Set: " + validator);
+ }
+ throw new SuccessException();
+ }
+ }
+ });
+
+ // add producing topology
+ DataStream<Tuple2<Long, String>> stream = env.addSource(new SourceFunction<Tuple2<Long, String>>() {
+ private static final long serialVersionUID = 1L;
+ int cnt = 0;
+
+ @Override
+ public boolean reachedEnd() throws Exception {
+ return false;
+ }
+
+ @Override
+ public Tuple2<Long, String> next() throws Exception {
+ Thread.sleep(100);
+ return new Tuple2<Long, String>(1000L + cnt, "kafka-" + cnt++);
+ }
+ });
+ stream.addSink(new KafkaSink<Tuple2<Long, String>>(brokerConnectionStrings, topic, new Utils.TypeInformationSerializationSchema<Tuple2<Long, String>>(new Tuple2<Long, String>(1L, ""), env.getConfig()), new CustomPartitioner()));
+
+ tryExecute(env, "custom partitioning test");
+
+ LOG.info("Finished KafkaITCase.customPartitioningTestTopology()");
+ }
+
+ /**
+ * This is for a topic with 3 partitions and Tuple2<Long, String>
+ */
+ private static class CustomPartitioner implements SerializableKafkaPartitioner {
+
+ @Override
+ public int partition(Object key, int numPartitions) {
+
+ @SuppressWarnings("unchecked")
+ Tuple2<Long, String> tuple = (Tuple2<Long, String>) key;
+ if (tuple.f0 < 10) {
+ return 0;
+ } else if (tuple.f0 < 20) {
+ return 1;
+ } else {
+ return 2;
+ }
+ }
+ }
+
+
+ @Test
+ public void simpleTestTopology() throws Exception {
+ String topic = "simpleTestTopic";
+
+ createTestTopic(topic, 1, 1);
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+
+ // add consuming topology:
+ DataStreamSource<String> consuming = env.addSource(
+ new PersistentKafkaSource<String>(topic, new JavaDefaultStringSchema(), standardCC));
+ consuming.addSink(new SinkFunction<String>() {
+ int elCnt = 0;
+ int start = -1;
+ BitSet validator = new BitSet(101);
+
+ @Override
+ public void invoke(String value) throws Exception {
+ LOG.debug("Got " + value);
+ String[] sp = value.split("-");
+ int v = Integer.parseInt(sp[1]);
+ if (start == -1) {
+ start = v;
+ }
+ Assert.assertFalse("Received tuple twice", validator.get(v - start));
+ validator.set(v - start);
+ elCnt++;
+ if (elCnt == 100) {
+ // check if everything in the bitset is set to true
+ int nc;
+ if ((nc = validator.nextClearBit(0)) != 100) {
+ throw new RuntimeException("The bitset was not set to 1 on all elements. Next clear:" + nc + " Set: " + validator);
+ }
+ throw new SuccessException();
+ }
+ }
+ });
+
+ // add producing topology
+ DataStream<String> stream = env.addSource(new SourceFunction<String>() {
+ private static final long serialVersionUID = 1L;
+ boolean running = true;
+ int cnt = 0;
+
+ @Override
+ public boolean reachedEnd() throws Exception {
+ return false;
+ }
+
+ @Override
+ public String next() throws Exception {
+ Thread.sleep(100);
+ return "kafka-" + cnt++;
+ }
+ });
+ stream.addSink(new KafkaSink<String>(brokerConnectionStrings, topic, new JavaDefaultStringSchema()));
+
+ tryExecute(env, "simpletest");
+ }
+
+ private static boolean leaderHasShutDown = false;
+ private static boolean shutdownKafkaBroker;
+
+ @Test(timeout=60000)
+ public void brokerFailureTest() throws Exception {
+ String topic = "brokerFailureTestTopic";
+
+ createTestTopic(topic, 2, 2);
+
+ // --------------------------- write data to topic ---------------------
+ LOG.info("Writing data to topic {}", topic);
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+
+ DataStream<String> stream = env.addSource(new SourceFunction<String>() {
+
+ private int cnt = 0;
+
+ @Override
+ public boolean reachedEnd() throws Exception {
+ return cnt == 200;
+ }
+
+ @Override
+ public String next() throws Exception {
+ String msg = "kafka-" + cnt++;
+ LOG.info("sending message = "+msg);
+
+ if ((cnt - 1) % 20 == 0) {
+ LOG.debug("Sending message #{}", cnt - 1);
+ }
+
+ return msg;
+ }
+
+ });
+ stream.addSink(new KafkaSink<String>(brokerConnectionStrings, topic, new JavaDefaultStringSchema()))
+ .setParallelism(1);
+
+ tryExecute(env, "broker failure test - writer");
+
+ // --------------------------- read and let broker fail ---------------------
+
+ LOG.info("Reading data from topic {} and let a broker fail", topic);
+ PartitionMetadata firstPart = null;
+ do {
+ if(firstPart != null) {
+ LOG.info("Unable to find leader. error code {}", firstPart.errorCode());
+ // not the first try. Sleep a bit
+ Thread.sleep(150);
+ }
+ Seq<PartitionMetadata> partitionMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient).partitionsMetadata();
+ firstPart = partitionMetadata.head();
+ } while(firstPart.errorCode() != 0);
+
+ final String leaderToShutDown = firstPart.leader().get().connectionString();
+ LOG.info("Leader to shutdown {}", leaderToShutDown);
+
+ final Thread brokerShutdown = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ shutdownKafkaBroker = false;
+ while (!shutdownKafkaBroker) {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ LOG.warn("Interruption", e);
+ }
+ }
+
+ for (KafkaServer kafkaServer : brokers) {
+ if (leaderToShutDown.equals(kafkaServer.config().advertisedHostName()+ ":"+ kafkaServer.config().advertisedPort())) {
+ LOG.info("Killing Kafka Server {}", leaderToShutDown);
+ kafkaServer.shutdown();
+ leaderHasShutDown = true;
+ break;
+ }
+ }
+ }
+ });
+ brokerShutdown.start();
+
+ // add consuming topology:
+ DataStreamSource<String> consuming = env.addSource(new PersistentKafkaSource<String>(topic, new JavaDefaultStringSchema(), standardCC));
+ consuming.setParallelism(1);
+
+ consuming.addSink(new SinkFunction<String>() {
+ int elCnt = 0;
+ int start = 0;
+ int numOfMessagesToBeCorrect = 100;
+ int stopAfterMessages = 150;
+
+ BitSet validator = new BitSet(numOfMessagesToBeCorrect + 1);
+
+ @Override
+ public void invoke(String value) throws Exception {
+ LOG.info("Got message = " + value + " leader has shut down " + leaderHasShutDown + " el cnt = " + elCnt + " to rec" + numOfMessagesToBeCorrect);
+ String[] sp = value.split("-");
+ int v = Integer.parseInt(sp[1]);
+
+ if (start == -1) {
+ start = v;
+ }
+ int offset = v - start;
+ Assert.assertFalse("Received tuple with value " + offset + " twice", validator.get(offset));
+ if (v - start < 0 && LOG.isWarnEnabled()) {
+ LOG.warn("Not in order: {}", value);
+ }
+
+ validator.set(offset);
+ elCnt++;
+ if (elCnt == 20) {
+ LOG.info("Asking leading broker to shut down");
+ // shut down a Kafka broker
+ shutdownKafkaBroker = true;
+ }
+ if (shutdownKafkaBroker) {
+ // we become a bit slower because the shutdown takes some time and we have
+ // only a fixed nubmer of elements to read
+ Thread.sleep(20);
+ }
+ if (leaderHasShutDown) { // it only makes sence to check once the shutdown is completed
+ if (elCnt >= stopAfterMessages) {
+ // check if everything in the bitset is set to true
+ int nc;
+ if ((nc = validator.nextClearBit(0)) < numOfMessagesToBeCorrect) {
+ throw new RuntimeException("The bitset was not set to 1 on all elements to be checked. Next clear:" + nc + " Set: " + validator);
+ }
+ throw new SuccessException();
+ }
+ }
+ }
+ });
+ tryExecute(env, "broker failure test - reader");
+
+ }
+
+ public static void tryExecute(StreamExecutionEnvironment see, String name) throws Exception {
+ try {
+ see.execute(name);
+ } catch (JobExecutionException good) {
+ Throwable t = good.getCause();
+ int limit = 0;
+ while (!(t instanceof SuccessException)) {
+ if(t == null) {
+ LOG.warn("Test failed with exception", good);
+ Assert.fail("Test failed with: " + StringUtils.stringifyException(good));
+ }
+
+ t = t.getCause();
+ if (limit++ == 20) {
+ LOG.warn("Test failed with exception", good);
+ Assert.fail("Test failed with: " + StringUtils.stringifyException(good));
+ }
+ }
+ }
+ }
+
+ private void createTestTopic(String topic, int numberOfPartitions, int replicationFactor) {
+ // create topic
+ Properties topicConfig = new Properties();
+ LOG.info("Creating topic {}", topic);
+ AdminUtils.createTopic(zkClient, topic, numberOfPartitions, replicationFactor, topicConfig);
+ }
+
+ private static TestingServer getZookeeper() throws Exception {
+ return new TestingServer(zkPort, tmpZkDir);
+ }
+
+ /**
+ * Copied from com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed)
+ */
+ private static KafkaServer getKafkaServer(int brokerId, File tmpFolder) throws UnknownHostException {
+ Properties kafkaProperties = new Properties();
+
+ int kafkaPort = NetUtils.getAvailablePort();
+
+ // properties have to be Strings
+ kafkaProperties.put("advertised.host.name", kafkaHost);
+ kafkaProperties.put("port", Integer.toString(kafkaPort));
+ kafkaProperties.put("broker.id", Integer.toString(brokerId));
+ kafkaProperties.put("log.dir", tmpFolder.toString());
+ kafkaProperties.put("zookeeper.connect", zookeeperConnectionString);
+ kafkaProperties.put("message.max.bytes", "" + (35 * 1024 * 1024));
+ kafkaProperties.put("replica.fetch.max.bytes", "" + (35 * 1024 * 1024));
+ KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);
+
+ KafkaServer server = new KafkaServer(kafkaConfig, new KafkaLocalSystemTime());
+ server.startup();
+ return server;
+ }
+
+ public static class SuccessException extends Exception {
+ private static final long serialVersionUID = 1L;
+ }
+
+
+ // ----------------------- Debugging utilities --------------------
+
+ /**
+ * Read topic to list, only using Kafka code.
+ * @return
+ */
+ private static List<MessageAndMetadata<byte[], byte[]>> readTopicToList(String topicName, ConsumerConfig config, final int stopAfter) {
+ ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(config);
+ // we request only one stream per consumer instance. Kafka will make sure that each consumer group
+ // will see each message only once.
+ Map<String,Integer> topicCountMap = Collections.singletonMap(topicName, 1);
+ Map<String, List<KafkaStream<byte[], byte[]>>> streams = consumerConnector.createMessageStreams(topicCountMap);
+ if(streams.size() != 1) {
+ throw new RuntimeException("Expected only one message stream but got "+streams.size());
+ }
+ List<KafkaStream<byte[], byte[]>> kafkaStreams = streams.get(topicName);
+ if(kafkaStreams == null) {
+ throw new RuntimeException("Requested stream not available. Available streams: "+streams.toString());
+ }
+ if(kafkaStreams.size() != 1) {
+ throw new RuntimeException("Requested 1 stream from Kafka, bot got "+kafkaStreams.size()+" streams");
+ }
+ LOG.info("Opening Consumer instance for topic '{}' on group '{}'", topicName, config.groupId());
+ ConsumerIterator<byte[], byte[]> iteratorToRead = kafkaStreams.get(0).iterator();
+
+ List<MessageAndMetadata<byte[], byte[]>> result = new ArrayList<MessageAndMetadata<byte[], byte[]>>();
+ int read = 0;
+ while(iteratorToRead.hasNext()) {
+ read++;
+ result.add(iteratorToRead.next());
+ if(read == stopAfter) {
+ LOG.info("Read "+read+" elements");
+ return result;
+ }
+ }
+ return result;
+ }
+
+ private static void printTopic(String topicName, ConsumerConfig config, DeserializationSchema deserializationSchema, int stopAfter){
+ List<MessageAndMetadata<byte[], byte[]>> contents = readTopicToList(topicName, config, stopAfter);
+ LOG.info("Printing contents of topic {} in consumer grouo {}", topicName, config.groupId());
+ for(MessageAndMetadata<byte[], byte[]> message: contents) {
+ Object out = deserializationSchema.deserialize(message.message());
+ LOG.info("Message: partition: {} offset: {} msg: {}", message.partition(), message.offset(), out.toString());
+ }
+ }
+
+ private static void printTopic(String topicName, int elements, ExecutionConfig ec) {
+ // write the sequence to log for debugging purposes
+ Properties stdProps = standardCC.props().props();
+ Properties newProps = new Properties(stdProps);
+ newProps.setProperty("group.id", "topic-printer"+UUID.randomUUID().toString());
+ newProps.setProperty("auto.offset.reset", "smallest");
+ newProps.setProperty("zookeeper.connect", standardCC.zkConnect());
+
+ ConsumerConfig printerConfig = new ConsumerConfig(newProps);
+ DeserializationSchema deserializer = new Utils.TypeInformationSerializationSchema<Tuple2<Integer, Integer>>(new Tuple2<Integer, Integer>(1,1), ec);
+ printTopic(topicName, printerConfig, deserializer, elements);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0fd60233/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/util/KafkaLocalSystemTime.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/util/KafkaLocalSystemTime.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/util/KafkaLocalSystemTime.java
new file mode 100644
index 0000000..18fa46f
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/util/KafkaLocalSystemTime.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import kafka.utils.Time;
+
+public class KafkaLocalSystemTime implements Time {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaLocalSystemTime.class);
+
+ @Override
+ public long milliseconds() {
+ return System.currentTimeMillis();
+ }
+
+ public long nanoseconds() {
+ return System.nanoTime();
+ }
+
+ @Override
+ public void sleep(long ms) {
+ try {
+ Thread.sleep(ms);
+ } catch (InterruptedException e) {
+ LOG.warn("Interruption", e);
+ }
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/0fd60233/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/kafka/KafkaITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/kafka/KafkaITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/kafka/KafkaITCase.java
deleted file mode 100644
index 7b7bdcc..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/kafka/KafkaITCase.java
+++ /dev/null
@@ -1,1023 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.BitSet;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Random;
-import java.util.UUID;
-
-import kafka.admin.AdminUtils;
-import kafka.api.PartitionMetadata;
-import kafka.consumer.Consumer;
-import kafka.consumer.ConsumerConfig;
-import kafka.consumer.ConsumerIterator;
-import kafka.consumer.KafkaStream;
-import kafka.javaapi.consumer.ConsumerConnector;
-import kafka.message.MessageAndMetadata;
-import kafka.network.SocketServer;
-import org.I0Itec.zkclient.ZkClient;
-import org.apache.curator.test.TestingServer;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.net.NetUtils;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.connectors.kafka.api.KafkaSink;
-import org.apache.flink.streaming.connectors.kafka.api.KafkaSource;
-import org.apache.flink.streaming.connectors.kafka.api.persistent.PersistentKafkaSource;
-import org.apache.flink.streaming.connectors.kafka.partitioner.SerializableKafkaPartitioner;
-import org.apache.flink.streaming.connectors.kafka.util.KafkaLocalSystemTime;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.StringUtils;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServer;
-import scala.collection.Seq;
-
-/**
- * Code in this test is based on the following GitHub repository:
- * (as per commit bc6b2b2d5f6424d5f377aa6c0871e82a956462ef)
- * <p/>
- * https://github.com/sakserv/hadoop-mini-clusters (ASL licensed)
- */
-
-public class KafkaITCase {
-
- private static final Logger LOG = LoggerFactory.getLogger(KafkaITCase.class);
- private static final int NUMBER_OF_KAFKA_SERVERS = 3;
-
- private static int zkPort;
- private static String kafkaHost;
-
- private static String zookeeperConnectionString;
-
- @ClassRule
- public static TemporaryFolder tempFolder = new TemporaryFolder();
- public static File tmpZkDir;
- public static List<File> tmpKafkaDirs;
-
- private static TestingServer zookeeper;
- private static List<KafkaServer> brokers;
- private static String brokerConnectionStrings = "";
-
- private static ConsumerConfig standardCC;
-
- private static ZkClient zkClient;
-
-
- @BeforeClass
- public static void prepare() throws IOException {
- LOG.info("Starting KafkaITCase.prepare()");
- tmpZkDir = tempFolder.newFolder();
-
- tmpKafkaDirs = new ArrayList<File>(NUMBER_OF_KAFKA_SERVERS);
- for (int i = 0; i < NUMBER_OF_KAFKA_SERVERS; i++) {
- tmpKafkaDirs.add(tempFolder.newFolder());
- }
-
- kafkaHost = InetAddress.getLocalHost().getHostName();
- zkPort = NetUtils.getAvailablePort();
- zookeeperConnectionString = "localhost:" + zkPort;
-
- zookeeper = null;
- brokers = null;
-
- try {
- LOG.info("Starting Zookeeper");
- zookeeper = getZookeeper();
- LOG.info("Starting KafkaServer");
- brokers = new ArrayList<KafkaServer>(NUMBER_OF_KAFKA_SERVERS);
- for (int i = 0; i < NUMBER_OF_KAFKA_SERVERS; i++) {
- brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i)));
- SocketServer socketServer = brokers.get(i).socketServer();
- String host = "localhost";
- if(socketServer.host() != null) {
- host = socketServer.host();
- }
- brokerConnectionStrings += host+":"+socketServer.port()+",";
- }
-
- LOG.info("ZK and KafkaServer started.");
- } catch (Throwable t) {
- LOG.warn("Test failed with exception", t);
- Assert.fail("Test failed with: " + t.getMessage());
- }
-
- Properties cProps = new Properties();
- cProps.setProperty("zookeeper.connect", zookeeperConnectionString);
- cProps.setProperty("group.id", "flink-tests");
- cProps.setProperty("auto.commit.enable", "false");
-
- cProps.setProperty("auto.offset.reset", "smallest"); // read from the beginning.
-
- standardCC = new ConsumerConfig(cProps);
-
- zkClient = new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(), standardCC.zkConnectionTimeoutMs(), new PersistentKafkaSource.KafkaZKStringSerializer());
- }
-
- @AfterClass
- public static void shutDownServices() {
- LOG.info("Shutting down all services");
- for (KafkaServer broker : brokers) {
- if (broker != null) {
- broker.shutdown();
- }
- }
- if (zookeeper != null) {
- try {
- zookeeper.stop();
- } catch (IOException e) {
- LOG.warn("ZK.stop() failed", e);
- }
- }
- zkClient.close();
- }
-
-
- @Test
- public void testOffsetManipulation() {
- ZkClient zk = new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(), standardCC.zkConnectionTimeoutMs(), new PersistentKafkaSource.KafkaZKStringSerializer());
-
- final String topicName = "testOffsetManipulation";
-
- // create topic
- Properties topicConfig = new Properties();
- LOG.info("Creating topic {}", topicName);
- AdminUtils.createTopic(zk, topicName, 3, 2, topicConfig);
-
- PersistentKafkaSource.setOffset(zk, standardCC.groupId(), topicName, 0, 1337);
-
- Assert.assertEquals(1337L, PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 0));
-
- zk.close();
- }
- /**
- * We want to use the High level java consumer API but manage the offset in Zookeeper manually.
- *
- */
- @Test
- public void testPersistentSourceWithOffsetUpdates() throws Exception {
- LOG.info("Starting testPersistentSourceWithOffsetUpdates()");
-
- ZkClient zk = new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(), standardCC.zkConnectionTimeoutMs(), new PersistentKafkaSource.KafkaZKStringSerializer());
-
- final String topicName = "testOffsetHacking";
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(3);
- env.getConfig().disableSysoutLogging();
- env.enableCheckpointing(50);
- env.setNumberOfExecutionRetries(0);
-
- // create topic
- Properties topicConfig = new Properties();
- LOG.info("Creating topic {}", topicName);
- AdminUtils.createTopic(zk, topicName, 3, 2, topicConfig);
-
- // write a sequence from 0 to 99 to each of the three partitions.
- writeSequence(env, topicName, 0, 99);
-
-
- readSequence(env, standardCC, topicName, 0, 100, 300);
-
- // check offsets to be set at least higher than 50.
- // correctly, we would expect them to be set to 99, but right now there is no way of stopping a topology once all pending
- // checkpoints have been committed.
- // To work around that limitation, the persistent kafka consumer is throtteled with a thread.sleep().
- long o1 = PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 0);
- long o2 = PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 1);
- long o3 = PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 2);
- Assert.assertTrue("The offset seems incorrect, got "+o1, o1 > 50L);
- Assert.assertTrue("The offset seems incorrect, got "+o2, o2 > 50L);
- Assert.assertTrue("The offset seems incorrect, got "+o3, o3 > 50L);
- /** Once we have proper shutdown of streaming jobs, enable these tests
- Assert.assertEquals("The offset seems incorrect", 99L, PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 0));
- Assert.assertEquals("The offset seems incorrect", 99L, PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 1));
- Assert.assertEquals("The offset seems incorrect", 99L, PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 2));*/
-
-
- LOG.info("Manipulating offsets");
- // set the offset to 25, 50, and 75 for the three partitions
- PersistentKafkaSource.setOffset(zk, standardCC.groupId(), topicName, 0, 50);
- PersistentKafkaSource.setOffset(zk, standardCC.groupId(), topicName, 1, 50);
- PersistentKafkaSource.setOffset(zk, standardCC.groupId(), topicName, 2, 50);
-
- // create new env
- env = StreamExecutionEnvironment.createLocalEnvironment(3);
- env.getConfig().disableSysoutLogging();
- readSequence(env, standardCC, topicName, 50, 50, 150);
-
- zk.close();
-
- LOG.info("Finished testPersistentSourceWithOffsetUpdates()");
- }
-
- private void readSequence(StreamExecutionEnvironment env, ConsumerConfig cc, final String topicName, final int valuesStartFrom, final int valuesCount, final int finalCount) throws Exception {
- LOG.info("Reading sequence for verification until final count {}", finalCount);
- DataStream<Tuple2<Integer, Integer>> source = env.addSource(
- new PersistentKafkaSource<Tuple2<Integer, Integer>>(topicName, new Utils.TypeInformationSerializationSchema<Tuple2<Integer, Integer>>(new Tuple2<Integer, Integer>(1,1), env.getConfig()), cc)
- )
- //add a sleeper mapper. Since there is no good way of "shutting down" a running topology, we have
- // to play this trick. The problem is that we have to wait until all checkpoints are confirmed
- .map(new MapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
- @Override
- public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> value) throws Exception {
- Thread.sleep(150);
- return value;
- }
- }).setParallelism(3);
-
- // verify data
- DataStream<Integer> validIndexes = source.flatMap(new RichFlatMapFunction<Tuple2<Integer, Integer>, Integer>() {
- int[] values = new int[valuesCount];
- int count = 0;
-
- @Override
- public void flatMap(Tuple2<Integer, Integer> value, Collector<Integer> out) throws Exception {
- values[value.f1 - valuesStartFrom]++;
- count++;
-
- LOG.info("Reader " + getRuntimeContext().getIndexOfThisSubtask() + " got " + value + " count=" + count + "/" + finalCount);
- // verify if we've seen everything
-
- if (count == finalCount) {
- LOG.info("Received all values");
- for (int i = 0; i < values.length; i++) {
- int v = values[i];
- if (v != 3) {
- LOG.warn("Test is going to fail");
- printTopic(topicName, valuesCount, this.getRuntimeContext().getExecutionConfig());
- throw new RuntimeException("Expected v to be 3, but was " + v + " on element " + i + " array=" + Arrays.toString(values));
- }
- }
- // test has passed
- throw new SuccessException();
- }
- }
-
- }).setParallelism(1);
-
- tryExecute(env, "Read data from Kafka");
-
- LOG.info("Successfully read sequence for verification");
- }
-
-
-
- private void writeSequence(StreamExecutionEnvironment env, String topicName, final int from, final int to) throws Exception {
- LOG.info("Writing sequence from {} to {} to topic {}", from, to, topicName);
-
- DataStream<Tuple2<Integer, Integer>> stream = env.addSource(new RichParallelSourceFunction<Tuple2<Integer, Integer>>() {
- private static final long serialVersionUID = 1L;
- int cnt = from;
- int partition;
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- partition = getRuntimeContext().getIndexOfThisSubtask();
-
- }
-
- @Override
- public boolean reachedEnd() throws Exception {
- return cnt > to;
- }
-
- @Override
- public Tuple2<Integer, Integer> next() throws Exception {
- LOG.info("Writing " + cnt + " to partition " + partition);
- Tuple2<Integer, Integer> result = new Tuple2<Integer, Integer>(getRuntimeContext().getIndexOfThisSubtask(), cnt);
- cnt++;
- return result;
- }
- }).setParallelism(3);
-
- stream.addSink(new KafkaSink<Tuple2<Integer, Integer>>(brokerConnectionStrings,
- topicName,
- new Utils.TypeInformationSerializationSchema<Tuple2<Integer, Integer>>(new Tuple2<Integer, Integer>(1, 1), env.getConfig()),
- new T2Partitioner()
- )).setParallelism(3);
- env.execute("Write sequence from " + from + " to " + to + " to topic " + topicName);
- LOG.info("Finished writing sequence");
- }
-
- private static class T2Partitioner implements SerializableKafkaPartitioner {
- @Override
- public int partition(Object key, int numPartitions) {
- if(numPartitions != 3) {
- throw new IllegalArgumentException("Expected three partitions");
- }
- Tuple2<Integer, Integer> element = (Tuple2<Integer, Integer>) key;
- return element.f0;
- }
- }
-
-
- @Test
- public void regularKafkaSourceTest() throws Exception {
- LOG.info("Starting KafkaITCase.regularKafkaSourceTest()");
-
- String topic = "regularKafkaSourceTestTopic";
- createTestTopic(topic, 1, 1);
-
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
- // add consuming topology:
- DataStreamSource<Tuple2<Long, String>> consuming = env.addSource(
- new KafkaSource<Tuple2<Long, String>>(zookeeperConnectionString, topic, "myFlinkGroup", new Utils.TypeInformationSerializationSchema<Tuple2<Long, String>>(new Tuple2<Long, String>(1L, ""), env.getConfig()), 5000));
- consuming.addSink(new SinkFunction<Tuple2<Long, String>>() {
- int elCnt = 0;
- int start = -1;
- BitSet validator = new BitSet(101);
-
- @Override
- public void invoke(Tuple2<Long, String> value) throws Exception {
- LOG.debug("Got value = " + value);
- String[] sp = value.f1.split("-");
- int v = Integer.parseInt(sp[1]);
-
- assertEquals(value.f0 - 1000, (long) v);
-
- if (start == -1) {
- start = v;
- }
- Assert.assertFalse("Received tuple twice", validator.get(v - start));
- validator.set(v - start);
- elCnt++;
- if (elCnt == 100) {
- // check if everything in the bitset is set to true
- int nc;
- if ((nc = validator.nextClearBit(0)) != 100) {
- throw new RuntimeException("The bitset was not set to 1 on all elements. Next clear:" + nc + " Set: " + validator);
- }
- throw new SuccessException();
- }
- }
- });
-
- // add producing topology
- DataStream<Tuple2<Long, String>> stream = env.addSource(new SourceFunction<Tuple2<Long, String>>() {
- private static final long serialVersionUID = 1L;
- int cnt = 0;
-
- @Override
- public boolean reachedEnd() throws Exception {
- return false;
- }
-
- @Override
- public Tuple2<Long, String> next() throws Exception {
- Thread.sleep(100);
- return new Tuple2<Long, String>(1000L + cnt, "kafka-" + cnt++);
- }
- });
- stream.addSink(new KafkaSink<Tuple2<Long, String>>(brokerConnectionStrings, topic, new Utils.TypeInformationSerializationSchema<Tuple2<Long, String>>(new Tuple2<Long, String>(1L, ""), env.getConfig())));
-
- tryExecute(env, "regular kafka source test");
-
- LOG.info("Finished KafkaITCase.regularKafkaSourceTest()");
- }
-
- @Test
- public void tupleTestTopology() throws Exception {
- LOG.info("Starting KafkaITCase.tupleTestTopology()");
-
- String topic = "tupleTestTopic";
- createTestTopic(topic, 1, 1);
-
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
-
- // add consuming topology:
- DataStreamSource<Tuple2<Long, String>> consuming = env.addSource(
- new PersistentKafkaSource<Tuple2<Long, String>>(topic,
- new Utils.TypeInformationSerializationSchema<Tuple2<Long, String>>(new Tuple2<Long, String>(1L, ""), env.getConfig()),
- standardCC
- ));
- consuming.addSink(new RichSinkFunction<Tuple2<Long, String>>() {
- int elCnt = 0;
- int start = -1;
- BitSet validator = new BitSet(101);
-
- @Override
- public void invoke(Tuple2<Long, String> value) throws Exception {
- LOG.info("Got value " + value);
- String[] sp = value.f1.split("-");
- int v = Integer.parseInt(sp[1]);
-
- assertEquals(value.f0 - 1000, (long) v);
-
- if (start == -1) {
- start = v;
- }
- Assert.assertFalse("Received tuple twice", validator.get(v - start));
- validator.set(v - start);
- elCnt++;
- if (elCnt == 100) {
- // check if everything in the bitset is set to true
- int nc;
- if ((nc = validator.nextClearBit(0)) != 100) {
- throw new RuntimeException("The bitset was not set to 1 on all elements. Next clear:" + nc + " Set: " + validator);
- }
- throw new SuccessException();
- }
- }
-
- @Override
- public void close() throws Exception {
- super.close();
- Assert.assertTrue("No element received", elCnt > 0);
- }
- });
-
- // add producing topology
- DataStream<Tuple2<Long, String>> stream = env.addSource(new SourceFunction<Tuple2<Long, String>>() {
- private static final long serialVersionUID = 1L;
- int cnt = 0;
-
- @Override
- public boolean reachedEnd() throws Exception {
- return false;
- }
-
- @Override
- public Tuple2<Long, String> next() throws Exception {
- Thread.sleep(100);
- return new Tuple2<Long, String>(1000L + cnt, "kafka-" + cnt++);
- }
- });
- stream.addSink(new KafkaSink<Tuple2<Long, String>>(brokerConnectionStrings, topic, new Utils.TypeInformationSerializationSchema<Tuple2<Long, String>>(new Tuple2<Long, String>(1L, ""), env.getConfig())));
-
- tryExecute(env, "tupletesttopology");
-
- LOG.info("Finished KafkaITCase.tupleTestTopology()");
- }
-
- /**
- * Test Flink's Kafka integration also with very big records (30MB)
- *
- * see http://stackoverflow.com/questions/21020347/kafka-sending-a-15mb-message
- *
- * @throws Exception
- */
- @Test
- public void bigRecordTestTopology() throws Exception {
-
- LOG.info("Starting KafkaITCase.bigRecordTestTopology()");
-
- String topic = "bigRecordTestTopic";
- createTestTopic(topic, 1, 1);
-
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
-
- // add consuming topology:
- Utils.TypeInformationSerializationSchema<Tuple2<Long, byte[]>> serSchema = new Utils.TypeInformationSerializationSchema<Tuple2<Long, byte[]>>(new Tuple2<Long, byte[]>(0L, new byte[]{0}), env.getConfig());
- Properties consumerProps = new Properties();
- consumerProps.setProperty("fetch.message.max.bytes", Integer.toString(1024 * 1024 * 30));
- consumerProps.setProperty("zookeeper.connect", zookeeperConnectionString);
- consumerProps.setProperty("group.id", "test");
- consumerProps.setProperty("auto.commit.enable", "false");
- consumerProps.setProperty("auto.offset.reset", "smallest");
-
- ConsumerConfig cc = new ConsumerConfig(consumerProps);
- DataStreamSource<Tuple2<Long, byte[]>> consuming = env.addSource(
- new PersistentKafkaSource<Tuple2<Long, byte[]>>(topic, serSchema, cc));
-
- consuming.addSink(new SinkFunction<Tuple2<Long, byte[]>>() {
- int elCnt = 0;
-
- @Override
- public void invoke(Tuple2<Long, byte[]> value) throws Exception {
- LOG.info("Received {}", value.f0);
- elCnt++;
- if(value.f0 == -1) {
- // we should have seen 11 elements now.
- if(elCnt == 11) {
- throw new SuccessException();
- } else {
- throw new RuntimeException("There have been "+elCnt+" elements");
- }
- }
- if(elCnt > 10) {
- throw new RuntimeException("More than 10 elements seen: "+elCnt);
- }
- }
- }).setParallelism(1);
-
- // add producing topology
- DataStream<Tuple2<Long, byte[]>> stream = env.addSource(new RichSourceFunction<Tuple2<Long, byte[]>>() {
- private static final long serialVersionUID = 1L;
- boolean running = true;
- long cnt;
- transient Random rnd;
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- cnt = 0;
- rnd = new Random(1337);
-
- }
-
- @Override
- public boolean reachedEnd() throws Exception {
- return cnt > 10;
- }
-
- @Override
- public Tuple2<Long, byte[]> next() throws Exception {
- Thread.sleep(100);
-
- if (cnt < 10) {
- byte[] wl = new byte[Math.abs(rnd.nextInt(1024 * 1024 * 30))];
- Tuple2<Long, byte[]> result = new Tuple2<Long, byte[]>(cnt++, wl);
- LOG.info("Emitted cnt=" + (cnt - 1) + " with byte.length = " + wl.length);
- return result;
-
- } else if (cnt == 10) {
- Tuple2<Long, byte[]> result = new Tuple2<Long, byte[]>(-1L, new byte[]{1});
- cnt++;
- return result;
- } else {
- throw new RuntimeException("Source is exhausted.");
- }
- }
- });
-
- stream.addSink(new KafkaSink<Tuple2<Long, byte[]>>(brokerConnectionStrings, topic,
- new Utils.TypeInformationSerializationSchema<Tuple2<Long, byte[]>>(new Tuple2<Long, byte[]>(0L, new byte[]{0}), env.getConfig()))
- );
-
- tryExecute(env, "big topology test");
-
- LOG.info("Finished KafkaITCase.bigRecordTestTopology()");
- }
-
-
- @Test
- public void customPartitioningTestTopology() throws Exception {
- LOG.info("Starting KafkaITCase.customPartitioningTestTopology()");
-
- String topic = "customPartitioningTestTopic";
-
- createTestTopic(topic, 3, 1);
-
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
-
- // add consuming topology:
- DataStreamSource<Tuple2<Long, String>> consuming = env.addSource(
- new PersistentKafkaSource<Tuple2<Long, String>>(topic,
- new Utils.TypeInformationSerializationSchema<Tuple2<Long, String>>(new Tuple2<Long, String>(1L, ""), env.getConfig()),
- standardCC));
- consuming.addSink(new SinkFunction<Tuple2<Long, String>>() {
- int start = -1;
- BitSet validator = new BitSet(101);
-
- boolean gotPartition1 = false;
- boolean gotPartition2 = false;
- boolean gotPartition3 = false;
-
- @Override
- public void invoke(Tuple2<Long, String> value) throws Exception {
- LOG.debug("Got " + value);
- String[] sp = value.f1.split("-");
- int v = Integer.parseInt(sp[1]);
-
- assertEquals(value.f0 - 1000, (long) v);
-
- switch (v) {
- case 9:
- gotPartition1 = true;
- break;
- case 19:
- gotPartition2 = true;
- break;
- case 99:
- gotPartition3 = true;
- break;
- }
-
- if (start == -1) {
- start = v;
- }
- Assert.assertFalse("Received tuple twice", validator.get(v - start));
- validator.set(v - start);
-
- if (gotPartition1 && gotPartition2 && gotPartition3) {
- // check if everything in the bitset is set to true
- int nc;
- if ((nc = validator.nextClearBit(0)) != 100) {
- throw new RuntimeException("The bitset was not set to 1 on all elements. Next clear:" + nc + " Set: " + validator);
- }
- throw new SuccessException();
- }
- }
- });
-
- // add producing topology
- DataStream<Tuple2<Long, String>> stream = env.addSource(new SourceFunction<Tuple2<Long, String>>() {
- private static final long serialVersionUID = 1L;
- int cnt = 0;
-
- @Override
- public boolean reachedEnd() throws Exception {
- return false;
- }
-
- @Override
- public Tuple2<Long, String> next() throws Exception {
- Thread.sleep(100);
- return new Tuple2<Long, String>(1000L + cnt, "kafka-" + cnt++);
- }
- });
- stream.addSink(new KafkaSink<Tuple2<Long, String>>(brokerConnectionStrings, topic, new Utils.TypeInformationSerializationSchema<Tuple2<Long, String>>(new Tuple2<Long, String>(1L, ""), env.getConfig()), new CustomPartitioner()));
-
- tryExecute(env, "custom partitioning test");
-
- LOG.info("Finished KafkaITCase.customPartitioningTestTopology()");
- }
-
- /**
- * This is for a topic with 3 partitions and Tuple2<Long, String>
- */
- private static class CustomPartitioner implements SerializableKafkaPartitioner {
-
- @Override
- public int partition(Object key, int numPartitions) {
-
- @SuppressWarnings("unchecked")
- Tuple2<Long, String> tuple = (Tuple2<Long, String>) key;
- if (tuple.f0 < 10) {
- return 0;
- } else if (tuple.f0 < 20) {
- return 1;
- } else {
- return 2;
- }
- }
- }
-
-
- @Test
- public void simpleTestTopology() throws Exception {
- String topic = "simpleTestTopic";
-
- createTestTopic(topic, 1, 1);
-
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
-
- // add consuming topology:
- DataStreamSource<String> consuming = env.addSource(
- new PersistentKafkaSource<String>(topic, new JavaDefaultStringSchema(), standardCC));
- consuming.addSink(new SinkFunction<String>() {
- int elCnt = 0;
- int start = -1;
- BitSet validator = new BitSet(101);
-
- @Override
- public void invoke(String value) throws Exception {
- LOG.debug("Got " + value);
- String[] sp = value.split("-");
- int v = Integer.parseInt(sp[1]);
- if (start == -1) {
- start = v;
- }
- Assert.assertFalse("Received tuple twice", validator.get(v - start));
- validator.set(v - start);
- elCnt++;
- if (elCnt == 100) {
- // check if everything in the bitset is set to true
- int nc;
- if ((nc = validator.nextClearBit(0)) != 100) {
- throw new RuntimeException("The bitset was not set to 1 on all elements. Next clear:" + nc + " Set: " + validator);
- }
- throw new SuccessException();
- }
- }
- });
-
- // add producing topology
- DataStream<String> stream = env.addSource(new SourceFunction<String>() {
- private static final long serialVersionUID = 1L;
- boolean running = true;
- int cnt = 0;
-
- @Override
- public boolean reachedEnd() throws Exception {
- return false;
- }
-
- @Override
- public String next() throws Exception {
- Thread.sleep(100);
- return "kafka-" + cnt++;
- }
- });
- stream.addSink(new KafkaSink<String>(brokerConnectionStrings, topic, new JavaDefaultStringSchema()));
-
- tryExecute(env, "simpletest");
- }
-
- private static boolean leaderHasShutDown = false;
- private static boolean shutdownKafkaBroker;
-
- @Test(timeout=60000)
- public void brokerFailureTest() throws Exception {
- String topic = "brokerFailureTestTopic";
-
- createTestTopic(topic, 2, 2);
-
- // --------------------------- write data to topic ---------------------
- LOG.info("Writing data to topic {}", topic);
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
-
- DataStream<String> stream = env.addSource(new SourceFunction<String>() {
-
- private int cnt = 0;
-
- @Override
- public boolean reachedEnd() throws Exception {
- return cnt == 200;
- }
-
- @Override
- public String next() throws Exception {
- String msg = "kafka-" + cnt++;
- LOG.info("sending message = "+msg);
-
- if ((cnt - 1) % 20 == 0) {
- LOG.debug("Sending message #{}", cnt - 1);
- }
-
- return msg;
- }
-
- });
- stream.addSink(new KafkaSink<String>(brokerConnectionStrings, topic, new JavaDefaultStringSchema()))
- .setParallelism(1);
-
- tryExecute(env, "broker failure test - writer");
-
- // --------------------------- read and let broker fail ---------------------
-
- LOG.info("Reading data from topic {} and let a broker fail", topic);
- PartitionMetadata firstPart = null;
- do {
- if(firstPart != null) {
- LOG.info("Unable to find leader. error code {}", firstPart.errorCode());
- // not the first try. Sleep a bit
- Thread.sleep(150);
- }
- Seq<PartitionMetadata> partitionMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient).partitionsMetadata();
- firstPart = partitionMetadata.head();
- } while(firstPart.errorCode() != 0);
-
- final String leaderToShutDown = firstPart.leader().get().connectionString();
- LOG.info("Leader to shutdown {}", leaderToShutDown);
-
- final Thread brokerShutdown = new Thread(new Runnable() {
- @Override
- public void run() {
- shutdownKafkaBroker = false;
- while (!shutdownKafkaBroker) {
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
- LOG.warn("Interruption", e);
- }
- }
-
- for (KafkaServer kafkaServer : brokers) {
- if (leaderToShutDown.equals(kafkaServer.config().advertisedHostName()+ ":"+ kafkaServer.config().advertisedPort())) {
- LOG.info("Killing Kafka Server {}", leaderToShutDown);
- kafkaServer.shutdown();
- leaderHasShutDown = true;
- break;
- }
- }
- }
- });
- brokerShutdown.start();
-
- // add consuming topology:
- DataStreamSource<String> consuming = env.addSource(new PersistentKafkaSource<String>(topic, new JavaDefaultStringSchema(), standardCC));
- consuming.setParallelism(1);
-
- consuming.addSink(new SinkFunction<String>() {
- int elCnt = 0;
- int start = 0;
- int numOfMessagesToBeCorrect = 100;
- int stopAfterMessages = 150;
-
- BitSet validator = new BitSet(numOfMessagesToBeCorrect + 1);
-
- @Override
- public void invoke(String value) throws Exception {
- LOG.info("Got message = " + value + " leader has shut down " + leaderHasShutDown + " el cnt = " + elCnt + " to rec" + numOfMessagesToBeCorrect);
- String[] sp = value.split("-");
- int v = Integer.parseInt(sp[1]);
-
- if (start == -1) {
- start = v;
- }
- int offset = v - start;
- Assert.assertFalse("Received tuple with value " + offset + " twice", validator.get(offset));
- if (v - start < 0 && LOG.isWarnEnabled()) {
- LOG.warn("Not in order: {}", value);
- }
-
- validator.set(offset);
- elCnt++;
- if (elCnt == 20) {
- LOG.info("Asking leading broker to shut down");
- // shut down a Kafka broker
- shutdownKafkaBroker = true;
- }
- if (shutdownKafkaBroker) {
- // we become a bit slower because the shutdown takes some time and we have
- // only a fixed nubmer of elements to read
- Thread.sleep(20);
- }
- if (leaderHasShutDown) { // it only makes sence to check once the shutdown is completed
- if (elCnt >= stopAfterMessages) {
- // check if everything in the bitset is set to true
- int nc;
- if ((nc = validator.nextClearBit(0)) < numOfMessagesToBeCorrect) {
- throw new RuntimeException("The bitset was not set to 1 on all elements to be checked. Next clear:" + nc + " Set: " + validator);
- }
- throw new SuccessException();
- }
- }
- }
- });
- tryExecute(env, "broker failure test - reader");
-
- }
-
- public static void tryExecute(StreamExecutionEnvironment see, String name) throws Exception {
- try {
- see.execute(name);
- } catch (JobExecutionException good) {
- Throwable t = good.getCause();
- int limit = 0;
- while (!(t instanceof SuccessException)) {
- if(t == null) {
- LOG.warn("Test failed with exception", good);
- Assert.fail("Test failed with: " + StringUtils.stringifyException(good));
- }
-
- t = t.getCause();
- if (limit++ == 20) {
- LOG.warn("Test failed with exception", good);
- Assert.fail("Test failed with: " + StringUtils.stringifyException(good));
- }
- }
- }
- }
-
- private void createTestTopic(String topic, int numberOfPartitions, int replicationFactor) {
- // create topic
- Properties topicConfig = new Properties();
- LOG.info("Creating topic {}", topic);
- AdminUtils.createTopic(zkClient, topic, numberOfPartitions, replicationFactor, topicConfig);
- }
-
- private static TestingServer getZookeeper() throws Exception {
- return new TestingServer(zkPort, tmpZkDir);
- }
-
- /**
- * Copied from com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed)
- */
- private static KafkaServer getKafkaServer(int brokerId, File tmpFolder) throws UnknownHostException {
- Properties kafkaProperties = new Properties();
-
- int kafkaPort = NetUtils.getAvailablePort();
-
- // properties have to be Strings
- kafkaProperties.put("advertised.host.name", kafkaHost);
- kafkaProperties.put("port", Integer.toString(kafkaPort));
- kafkaProperties.put("broker.id", Integer.toString(brokerId));
- kafkaProperties.put("log.dir", tmpFolder.toString());
- kafkaProperties.put("zookeeper.connect", zookeeperConnectionString);
- kafkaProperties.put("message.max.bytes", "" + (35 * 1024 * 1024));
- kafkaProperties.put("replica.fetch.max.bytes", "" + (35 * 1024 * 1024));
- KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);
-
- KafkaServer server = new KafkaServer(kafkaConfig, new KafkaLocalSystemTime());
- server.startup();
- return server;
- }
-
- public static class SuccessException extends Exception {
- private static final long serialVersionUID = 1L;
- }
-
-
- // ----------------------- Debugging utilities --------------------
-
- /**
- * Read topic to list, only using Kafka code.
- * @return
- */
- private static List<MessageAndMetadata<byte[], byte[]>> readTopicToList(String topicName, ConsumerConfig config, final int stopAfter) {
- ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(config);
- // we request only one stream per consumer instance. Kafka will make sure that each consumer group
- // will see each message only once.
- Map<String,Integer> topicCountMap = Collections.singletonMap(topicName, 1);
- Map<String, List<KafkaStream<byte[], byte[]>>> streams = consumerConnector.createMessageStreams(topicCountMap);
- if(streams.size() != 1) {
- throw new RuntimeException("Expected only one message stream but got "+streams.size());
- }
- List<KafkaStream<byte[], byte[]>> kafkaStreams = streams.get(topicName);
- if(kafkaStreams == null) {
- throw new RuntimeException("Requested stream not available. Available streams: "+streams.toString());
- }
- if(kafkaStreams.size() != 1) {
- throw new RuntimeException("Requested 1 stream from Kafka, bot got "+kafkaStreams.size()+" streams");
- }
- LOG.info("Opening Consumer instance for topic '{}' on group '{}'", topicName, config.groupId());
- ConsumerIterator<byte[], byte[]> iteratorToRead = kafkaStreams.get(0).iterator();
-
- List<MessageAndMetadata<byte[], byte[]>> result = new ArrayList<MessageAndMetadata<byte[], byte[]>>();
- int read = 0;
- while(iteratorToRead.hasNext()) {
- read++;
- result.add(iteratorToRead.next());
- if(read == stopAfter) {
- LOG.info("Read "+read+" elements");
- return result;
- }
- }
- return result;
- }
-
- private static void printTopic(String topicName, ConsumerConfig config, DeserializationSchema deserializationSchema, int stopAfter){
- List<MessageAndMetadata<byte[], byte[]>> contents = readTopicToList(topicName, config, stopAfter);
- LOG.info("Printing contents of topic {} in consumer grouo {}", topicName, config.groupId());
- for(MessageAndMetadata<byte[], byte[]> message: contents) {
- Object out = deserializationSchema.deserialize(message.message());
- LOG.info("Message: partition: {} offset: {} msg: {}", message.partition(), message.offset(), out.toString());
- }
- }
-
- private static void printTopic(String topicName, int elements, ExecutionConfig ec) {
- // write the sequence to log for debugging purposes
- Properties stdProps = standardCC.props().props();
- Properties newProps = new Properties(stdProps);
- newProps.setProperty("group.id", "topic-printer"+UUID.randomUUID().toString());
- newProps.setProperty("auto.offset.reset", "smallest");
- newProps.setProperty("zookeeper.connect", standardCC.zkConnect());
-
- ConsumerConfig printerConfig = new ConsumerConfig(newProps);
- DeserializationSchema deserializer = new Utils.TypeInformationSerializationSchema<Tuple2<Integer, Integer>>(new Tuple2<Integer, Integer>(1,1), ec);
- printTopic(topicName, printerConfig, deserializer, elements);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0fd60233/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/kafka/util/KafkaLocalSystemTime.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/kafka/util/KafkaLocalSystemTime.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/kafka/util/KafkaLocalSystemTime.java
deleted file mode 100644
index 18fa46f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/kafka/util/KafkaLocalSystemTime.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.util;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import kafka.utils.Time;
-
-public class KafkaLocalSystemTime implements Time {
-
- private static final Logger LOG = LoggerFactory.getLogger(KafkaLocalSystemTime.class);
-
- @Override
- public long milliseconds() {
- return System.currentTimeMillis();
- }
-
- public long nanoseconds() {
- return System.nanoTime();
- }
-
- @Override
- public void sleep(long ms) {
- try {
- Thread.sleep(ms);
- } catch (InterruptedException e) {
- LOG.warn("Interruption", e);
- }
- }
-
-}
-
[5/6] flink git commit: [docs] Removed unnecessary Serializable from
ExecutionEnvironment JavaDocs
Posted by mb...@apache.org.
[docs] Removed unnecessary Serializable from ExecutionEnvironment JavaDocs
Also did for StreamExecutionEnvironment
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2315c7c5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2315c7c5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2315c7c5
Branch: refs/heads/master
Commit: 2315c7c5935a703eee10eb9eabb75aa8490d1457
Parents: 120bd0f
Author: mbalassi <mb...@apache.org>
Authored: Sun May 24 17:51:21 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Fri May 29 15:08:32 2015 +0200
----------------------------------------------------------------------
.../flink/api/java/ExecutionEnvironment.java | 30 +--------
.../environment/StreamExecutionEnvironment.java | 64 ++++++--------------
2 files changed, 21 insertions(+), 73 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2315c7c5/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 9c76409..3a5b04f 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -19,7 +19,6 @@
package org.apache.flink.api.java;
import java.io.IOException;
-import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
@@ -600,9 +599,7 @@ public abstract class ExecutionEnvironment {
/**
* Creates a DataSet from the given non-empty collection. The type of the data set is that
- * of the elements in the collection. The elements need to be serializable (as defined by
- * {@link java.io.Serializable}), because the framework may move the elements into the cluster
- * if needed.
+ * of the elements in the collection.
* <p>
* The framework will try and determine the exact type from the collection elements.
* In case of generic elements, it may be necessary to manually supply the type information
@@ -632,13 +629,8 @@ public abstract class ExecutionEnvironment {
}
/**
- * Creates a DataSet from the given non-empty collection. The type of the data set is that
- * of the elements in the collection. The elements need to be serializable (as defined by
- * {@link java.io.Serializable}), because the framework may move the elements into the cluster
- * if needed.
- * <p>
- * Note that this operation will result in a non-parallel data source, i.e. a data source with
- * a parallelism of one.
+ * Creates a DataSet from the given non-empty collection. Note that this operation will result
+ * in a non-parallel data source, i.e. a data source with a parallelism of one.
* <p>
* The returned DataSet is typed to the given TypeInformation.
*
@@ -663,9 +655,6 @@ public abstract class ExecutionEnvironment {
* explicitly in the form of the type class (this is due to the fact that the Java compiler
* erases the generic type information).
* <p>
- * The iterator must be serializable (as defined in {@link java.io.Serializable}), because the
- * framework may move it to a remote environment, if needed.
- * <p>
* Note that this operation will result in a non-parallel data source, i.e. a data source with
* a parallelism of one.
*
@@ -686,9 +675,6 @@ public abstract class ExecutionEnvironment {
* is generic. In that case, the type class (as given in {@link #fromCollection(Iterator, Class)}
* does not supply all type information.
* <p>
- * The iterator must be serializable (as defined in {@link java.io.Serializable}), because the
- * framework may move it to a remote environment, if needed.
- * <p>
* Note that this operation will result in a non-parallel data source, i.e. a data source with
* a parallelism of one.
*
@@ -699,10 +685,6 @@ public abstract class ExecutionEnvironment {
* @see #fromCollection(Iterator, Class)
*/
public <X> DataSource<X> fromCollection(Iterator<X> data, TypeInformation<X> type) {
- if (!(data instanceof Serializable)) {
- throw new IllegalArgumentException("The iterator must be serializable.");
- }
-
return new DataSource<X>(this, new IteratorInputFormat<X>(data), type, Utils.getCallLocationName());
}
@@ -710,8 +692,6 @@ public abstract class ExecutionEnvironment {
/**
* Creates a new data set that contains the given elements. The elements must all be of the same type,
* for example, all of the {@link String} or {@link Integer}. The sequence of elements must not be empty.
- * Furthermore, the elements must be serializable (as defined in {@link java.io.Serializable}, because the
- * execution environment may ship the elements into the cluster.
* <p>
* The framework will try and determine the exact type from the collection elements.
* In case of generic elements, it may be necessary to manually supply the type information
@@ -738,8 +718,6 @@ public abstract class ExecutionEnvironment {
/**
* Creates a new data set that contains elements in the iterator. The iterator is splittable, allowing the
* framework to create a parallel data source that returns the elements in the iterator.
- * The iterator must be serializable (as defined in {@link java.io.Serializable}, because the
- * execution environment may ship the elements into the cluster.
* <p>
* Because the iterator will remain unmodified until the actual execution happens, the type of data
* returned by the iterator must be given explicitly in the form of the type class (this is due to the
@@ -758,8 +736,6 @@ public abstract class ExecutionEnvironment {
/**
* Creates a new data set that contains elements in the iterator. The iterator is splittable, allowing the
* framework to create a parallel data source that returns the elements in the iterator.
- * The iterator must be serializable (as defined in {@link java.io.Serializable}, because the
- * execution environment may ship the elements into the cluster.
* <p>
* Because the iterator will remain unmodified until the actual execution happens, the type of data
* returned by the iterator must be given explicitly in the form of the type information.
http://git-wip-us.apache.org/repos/asf/flink/blob/2315c7c5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 09a8788..8dd15bb 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -70,7 +70,6 @@ import org.apache.hadoop.mapreduce.Job;
import java.io.File;
import java.io.IOException;
-import java.io.Serializable;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
@@ -438,15 +437,12 @@ public abstract class StreamExecutionEnvironment {
/**
* Creates a new data stream that contains the given elements. The elements must all be of the same type, for
- * example, all of the {@link String} or {@link Integer}. The sequence of elements must not be empty. Furthermore,
- * the elements must be serializable (as defined in {@link java.io.Serializable}), because the execution
- * environment
- * may ship the elements into the cluster.
- * <p/>
+ * example, all of the {@link String} or {@link Integer}.
+ * <p>
* The framework will try and determine the exact type from the elements. In case of generic elements, it may be
* necessary to manually supply the type information via {@link #fromCollection(java.util.Collection,
* org.apache.flink.api.common.typeinfo.TypeInformation)}.
- * <p/>
+ * <p>
* Note that this operation will result in a non-parallel data stream source, i.e. a data stream source with a
* degree of parallelism one.
*
@@ -471,15 +467,14 @@ public abstract class StreamExecutionEnvironment {
/**
* Creates a data stream from the given non-empty collection. The type of the data stream is that of the
- * elements in
- * the collection. The elements need to be serializable (as defined by {@link java.io.Serializable}), because the
- * framework may move the elements into the cluster if needed.
- * <p/>
+ * elements in the collection.
+ *
+ * <p>
* The framework will try and determine the exact type from the collection elements. In case of generic
- * elements, it
- * may be necessary to manually supply the type information via {@link #fromCollection(java.util.Collection,
- * org.apache.flink.api.common.typeinfo.TypeInformation)}.
- * <p/>
+ * elements, it may be necessary to manually supply the type information via
+ * {@link #fromCollection(java.util.Collection, org.apache.flink.api.common.typeinfo.TypeInformation)}.
+ * <p>
+ *
* Note that this operation will result in a non-parallel data stream source, i.e. a data stream source with a
* degree of parallelism one.
*
@@ -503,12 +498,8 @@ public abstract class StreamExecutionEnvironment {
}
/**
- * Creates a data stream from the given non-empty collection. The type of the data stream is the type given by
- * typeInfo. The elements need to be serializable (as defined by {@link java.io.Serializable}), because the
- * framework may move the elements into the cluster if needed.
- * <p/>
- * Note that this operation will result in a non-parallel data stream source, i.e. a data stream source with a
- * degree of parallelism one.
+ * Creates a data stream from the given non-empty collection.Note that this operation will result in
+ * a non-parallel data stream source, i.e. a data stream source with a degree of parallelism one.
*
* @param data
* The collection of elements to create the data stream from
@@ -535,11 +526,7 @@ public abstract class StreamExecutionEnvironment {
* Creates a data stream from the given iterator. Because the iterator will remain unmodified until the actual
* execution happens, the type of data returned by the iterator must be given explicitly in the form of the type
* class (this is due to the fact that the Java compiler erases the generic type information).
- * <p/>
- * The iterator must be serializable (as defined in {@link java.io.Serializable}), because the framework may
- * move it
- * to a remote environment, if needed.
- * <p/>
+ * <p>
* Note that this operation will result in a non-parallel data stream source, i.e. a data stream source with a
* degree of parallelism of one.
*
@@ -562,11 +549,7 @@ public abstract class StreamExecutionEnvironment {
* information. This method is useful for cases where the type is generic. In that case, the type class (as
* given in
* {@link #fromCollection(java.util.Iterator, Class)} does not supply all type information.
- * <p/>
- * The iterator must be serializable (as defined in {@link java.io.Serializable}), because the framework may
- * move it
- * to a remote environment, if needed.
- * <p/>
+ * <p>
* Note that this operation will result in a non-parallel data stream source, i.e. a data stream source with a
* degree of parallelism one.
*
@@ -581,9 +564,6 @@ public abstract class StreamExecutionEnvironment {
public <OUT> DataStreamSource<OUT> fromCollection(Iterator<OUT> data, TypeInformation<OUT>
typeInfo) {
Preconditions.checkNotNull(data, "The iterator must not be null");
- if (!(data instanceof Serializable)) {
- throw new IllegalArgumentException("The iterator must be serializable.");
- }
SourceFunction<OUT> function = new FromIteratorFunction<OUT>(data);
return addSource(function, "Collection Source").returns(typeInfo);
@@ -597,12 +577,8 @@ public abstract class StreamExecutionEnvironment {
/**
* Creates a new data stream that contains elements in the iterator. The iterator is splittable, allowing the
- * framework to create a parallel data stream source that returns the elements in the iterator. The iterator
- * must be
- * serializable (as defined in {@link java.io.Serializable}, because the execution environment may ship the
- * elements
- * into the cluster.
- * <p/>
+ * framework to create a parallel data stream source that returns the elements in the iterator.
+ * <p>
* Because the iterator will remain unmodified until the actual execution happens, the type of data returned by the
* iterator must be given explicitly in the form of the type class (this is due to the fact that the Java compiler
* erases the generic type information).
@@ -621,12 +597,8 @@ public abstract class StreamExecutionEnvironment {
/**
* Creates a new data stream that contains elements in the iterator. The iterator is splittable, allowing the
- * framework to create a parallel data stream source that returns the elements in the iterator. The iterator
- * must be
- * serializable (as defined in {@link java.io.Serializable}, because the execution environment may ship the
- * elements
- * into the cluster.
- * <p/>
+ * framework to create a parallel data stream source that returns the elements in the iterator.
+ * <p>
* Because the iterator will remain unmodified until the actual execution happens, the type of data returned by the
* iterator must be given explicitly in the form of the type information. This method is useful for cases where the
* type is generic. In that case, the type class (as given in {@link #fromParallelCollection(org.apache.flink.util.SplittableIterator,
[3/6] flink git commit: [FLINK-1687] [streaming] [api-extending]
Synchronizing streaming source API to batch source API
Posted by mb...@apache.org.
[FLINK-1687] [streaming] [api-extending] Synchronizing streaming source API to batch source API
Closes #521
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1c3b67ed
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1c3b67ed
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1c3b67ed
Branch: refs/heads/master
Commit: 1c3b67edd007d6d3727f1786eb0a99a0ee3a64c9
Parents: 84fce54
Author: szape <ne...@gmail.com>
Authored: Thu Apr 16 14:21:16 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Fri May 29 15:08:32 2015 +0200
----------------------------------------------------------------------
.../environment/StreamExecutionEnvironment.java | 896 ++++++++++++++-----
.../source/FileMonitoringFunction.java | 2 -
.../functions/source/FileSourceFunction.java | 33 +-
.../functions/source/FromIteratorFunction.java | 44 +
.../flink/streaming/api/graph/StreamGraph.java | 2 +-
.../flink/streaming/api/graph/StreamNode.java | 6 +-
6 files changed, 720 insertions(+), 263 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1c3b67ed/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 02c8dad..a1fe60d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -17,24 +17,29 @@
package org.apache.flink.streaming.api.environment;
-import java.io.File;
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.List;
-
+import com.esotericsoftware.kryo.Serializer;
+import com.google.common.base.Preconditions;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
+import org.apache.flink.api.java.io.PrimitiveInputFormat;
import org.apache.flink.api.java.io.TextInputFormat;
+import org.apache.flink.api.java.io.TextValueInputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.MissingTypeInfo;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
import org.apache.flink.client.program.Client;
import org.apache.flink.client.program.Client.OptimizerPlanEnvironment;
import org.apache.flink.client.program.ContextEnvironment;
@@ -44,25 +49,35 @@ import org.apache.flink.runtime.state.FileStateHandle;
import org.apache.flink.runtime.state.StateHandleProvider;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction;
import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType;
+import org.apache.flink.streaming.api.functions.source.FileReadFunction;
import org.apache.flink.streaming.api.functions.source.FileSourceFunction;
import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
+import org.apache.flink.streaming.api.functions.source.FromIteratorFunction;
import org.apache.flink.streaming.api.functions.source.GenSequenceFunction;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.types.StringValue;
+import org.apache.flink.util.NumberSequenceIterator;
+import org.apache.flink.util.SplittableIterator;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
-import com.esotericsoftware.kryo.Serializer;
-import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
/**
- * {@link ExecutionEnvironment} for streaming jobs. An instance of it is
+ * {@link org.apache.flink.api.java.ExecutionEnvironment} for streaming jobs. An instance of it is
* necessary to construct streaming topologies.
- *
*/
public abstract class StreamExecutionEnvironment {
@@ -100,9 +115,9 @@ public abstract class StreamExecutionEnvironment {
* Gets the parallelism with which operation are executed by default.
* Operations can individually override this value to use a specific
* parallelism.
- *
+ *
* @return The parallelism used by operations, unless they override that
- * value.
+ * value.
* @deprecated Please use {@link #getParallelism}
*/
@Deprecated
@@ -114,9 +129,9 @@ public abstract class StreamExecutionEnvironment {
* Gets the parallelism with which operation are executed by default.
* Operations can individually override this value to use a specific
* parallelism.
- *
+ *
* @return The parallelism used by operations, unless they override that
- * value.
+ * value.
*/
public int getParallelism() {
return config.getParallelism();
@@ -131,9 +146,9 @@ public abstract class StreamExecutionEnvironment {
* number of hardware contexts (CPU cores / threads). When executing the
* program via the command line client from a JAR file, the default degree
* of parallelism is the one configured for that setup.
- *
+ *
* @param parallelism
- * The parallelism
+ * The parallelism
* @deprecated Please use {@link #setParallelism}
*/
@Deprecated
@@ -150,9 +165,9 @@ public abstract class StreamExecutionEnvironment {
* number of hardware contexts (CPU cores / threads). When executing the
* program via the command line client from a JAR file, the default degree
* of parallelism is the one configured for that setup.
- *
+ *
* @param parallelism
- * The parallelism
+ * The parallelism
*/
public StreamExecutionEnvironment setParallelism(int parallelism) {
if (parallelism < 1) {
@@ -167,7 +182,7 @@ public abstract class StreamExecutionEnvironment {
* output buffers. By default the output buffers flush frequently to provide
* low latency and to aid smooth developer experience. Setting the parameter
* can result in three logical modes:
- *
+ * <p/>
* <ul>
* <li>
* A positive integer triggers flushing periodically by that integer</li>
@@ -177,9 +192,9 @@ public abstract class StreamExecutionEnvironment {
* -1 triggers flushing only when the output buffer is full thus maximizing
* throughput</li>
* </ul>
- *
+ *
* @param timeoutMillis
- * The maximum time between two output flushes.
+ * The maximum time between two output flushes.
*/
public StreamExecutionEnvironment setBufferTimeout(long timeoutMillis) {
if (timeoutMillis < -1) {
@@ -194,7 +209,7 @@ public abstract class StreamExecutionEnvironment {
* Disables operator chaining for streaming operators. Operator chaining
* allows non-shuffle operations to be co-located in the same thread fully
* avoiding serialization and de-serialization.
- *
+ *
* @return StreamExecutionEnvironment with chaining disabled.
*/
public StreamExecutionEnvironment disableOperatorChaining() {
@@ -205,16 +220,16 @@ public abstract class StreamExecutionEnvironment {
/**
* Method for enabling fault-tolerance. Activates monitoring and backup of
* streaming operator states.
- *
- * <p>
+ * <p/>
+ * <p/>
* Setting this option assumes that the job is used in production and thus
* if not stated explicitly otherwise with calling with the
* {@link #setNumberOfExecutionRetries(int numberOfExecutionRetries)} method
* in case of failure the job will be resubmitted to the cluster
* indefinitely.
- *
+ *
* @param interval
- * Time interval between state checkpoints in millis
+ * Time interval between state checkpoints in millis
*/
public StreamExecutionEnvironment enableCheckpointing(long interval) {
streamGraph.setCheckpointingEnabled(true);
@@ -225,8 +240,8 @@ public abstract class StreamExecutionEnvironment {
/**
* Method for enabling fault-tolerance. Activates monitoring and backup of
* streaming operator states.
- *
- * <p>
+ * <p/>
+ * <p/>
* Setting this option assumes that the job is used in production and thus
* if not stated explicitly otherwise with calling with the
* {@link #setNumberOfExecutionRetries(int numberOfExecutionRetries)} method
@@ -256,10 +271,10 @@ public abstract class StreamExecutionEnvironment {
* zero effectively disables fault tolerance. A value of {@code -1}
* indicates that the system default value (as defined in the configuration)
* should be used.
- *
+ *
* @param numberOfExecutionRetries
- * The number of times the system will try to re-execute failed
- * tasks.
+ * The number of times the system will try to re-execute failed
+ * tasks.
*/
public void setNumberOfExecutionRetries(int numberOfExecutionRetries) {
config.setNumberOfExecutionRetries(numberOfExecutionRetries);
@@ -269,9 +284,9 @@ public abstract class StreamExecutionEnvironment {
* Gets the number of times the system will try to re-execute failed tasks.
* A value of {@code -1} indicates that the system default value (as defined
* in the configuration) should be used.
- *
+ *
* @return The number of times the system will try to re-execute failed
- * tasks.
+ * tasks.
*/
public int getNumberOfExecutionRetries() {
return config.getNumberOfExecutionRetries();
@@ -281,7 +296,7 @@ public abstract class StreamExecutionEnvironment {
* Sets the maximum time frequency (milliseconds) for the flushing of the
* output buffers. For clarification on the extremal values see
* {@link #setBufferTimeout(long)}.
- *
+ *
* @return The timeout of the buffer.
*/
public long getBufferTimeout() {
@@ -291,9 +306,9 @@ public abstract class StreamExecutionEnvironment {
/**
* Sets the default parallelism that will be used for the local execution
* environment created by {@link #createLocalEnvironment()}.
- *
+ *
* @param parallelism
- * The parallelism to use as the default local parallelism.
+ * The parallelism to use as the default local parallelism.
*/
public static void setDefaultLocalParallelism(int parallelism) {
defaultLocalParallelism = parallelism;
@@ -305,15 +320,15 @@ public abstract class StreamExecutionEnvironment {
/**
* Adds a new Kryo default serializer to the Runtime.
- *
+ * <p/>
* Note that the serializer instance must be serializable (as defined by
* java.io.Serializable), because it may be distributed to the worker nodes
* by java serialization.
- *
+ *
* @param type
- * The class of the types serialized with the given serializer.
+ * The class of the types serialized with the given serializer.
* @param serializer
- * The serializer to use.
+ * The serializer to use.
*/
public void addDefaultKryoSerializer(Class<?> type, Serializer<?> serializer) {
config.addDefaultKryoSerializer(type, serializer);
@@ -321,11 +336,11 @@ public abstract class StreamExecutionEnvironment {
/**
* Adds a new Kryo default serializer to the Runtime.
- *
+ *
* @param type
- * The class of the types serialized with the given serializer.
+ * The class of the types serialized with the given serializer.
* @param serializerClass
- * The class of the serializer to use.
+ * The class of the serializer to use.
*/
public void addDefaultKryoSerializer(Class<?> type,
Class<? extends Serializer<?>> serializerClass) {
@@ -334,15 +349,15 @@ public abstract class StreamExecutionEnvironment {
/**
* Registers the given type with a Kryo Serializer.
- *
+ * <p/>
* Note that the serializer instance must be serializable (as defined by
* java.io.Serializable), because it may be distributed to the worker nodes
* by java serialization.
- *
+ *
* @param type
- * The class of the types serialized with the given serializer.
+ * The class of the types serialized with the given serializer.
* @param serializer
- * The serializer to use.
+ * The serializer to use.
*/
public void registerTypeWithKryoSerializer(Class<?> type, Serializer<?> serializer) {
config.registerTypeWithKryoSerializer(type, serializer);
@@ -351,11 +366,11 @@ public abstract class StreamExecutionEnvironment {
/**
* Registers the given Serializer via its class as a serializer for the
* given type at the KryoSerializer
- *
+ *
* @param type
- * The class of the types serialized with the given serializer.
+ * The class of the types serialized with the given serializer.
* @param serializerClass
- * The class of the serializer to use.
+ * The class of the serializer to use.
*/
public void registerTypeWithKryoSerializer(Class<?> type,
Class<? extends Serializer<?>> serializerClass) {
@@ -367,9 +382,9 @@ public abstract class StreamExecutionEnvironment {
* eventually serialized as a POJO, then the type is registered with the
* POJO serializer. If the type ends up being serialized with Kryo, then it
* will be registered at Kryo to make sure that only tags are written.
- *
+ *
* @param type
- * The class of the type to register.
+ * The class of the type to register.
*/
public void registerType(Class<?> type) {
if (type == null) {
@@ -390,31 +405,271 @@ public abstract class StreamExecutionEnvironment {
// --------------------------------------------------------------------------------------------
/**
- * Creates a DataStream that represents the Strings produced by reading the
- * given file line wise. The file will be read with the system's default
- * character set.
- *
+ * Creates a new data stream that contains a sequence of numbers. The data stream will be created with parallelism
+ * one, so the order of the elements is guaranteed.
+ *
+ * @param from
+ * The number to start at (inclusive)
+ * @param to
+ * The number to stop at (inclusive)
+ * @return A data stream, containing all number in the [from, to] interval
+ */
+ public DataStreamSource<Long> generateSequence(long from, long to) {
+ if (from > to) {
+ throw new IllegalArgumentException("Start of sequence must not be greater than the end");
+ }
+ return addSource(new GenSequenceFunction(from, to), "Sequence Source");
+ }
+
+ /**
+ * Creates a new data stream that contains a sequence of numbers. The data stream will be created in parallel, so
+ * there is no guarantee about the oder of the elements.
+ *
+ * @param from
+ * The number to start at (inclusive)
+ * @param to
+ * The number to stop at (inclusive)
+ * @return A data stream, containing all number in the [from, to] interval
+ */
+ public DataStreamSource<Long> generateParallelSequence(long from, long to) {
+ return fromParallelCollection(new NumberSequenceIterator(from, to), BasicTypeInfo.LONG_TYPE_INFO, "Parellel " +
+ "Sequence source");
+ }
+
+ /**
+ * Creates a new data stream that contains the given elements. The elements must all be of the same type, for
+ * example, all of the {@link String} or {@link Integer}. The sequence of elements must not be empty. Furthermore,
+ * the elements must be serializable (as defined in {@link java.io.Serializable}), because the execution
+ * environment
+ * may ship the elements into the cluster.
+ * <p/>
+ * The framework will try and determine the exact type from the elements. In case of generic elements, it may be
+ * necessary to manually supply the type information via {@link #fromCollection(java.util.Collection,
+ * org.apache.flink.api.common.typeinfo.TypeInformation)}.
+ * <p/>
+ * Note that this operation will result in a non-parallel data stream source, i.e. a data stream source with a
+ * degree of parallelism one.
+ *
+ * @param data
+ * The array of elements to create the data stream from.
+ * @param <OUT>
+ * The type of the returned data stream
+ * @return The data stream representing the given array of elements
+ */
+ public <OUT extends Serializable> DataStreamSource<OUT> fromElements(OUT... data) {
+ if (data.length == 0) {
+ throw new IllegalArgumentException(
+ "fromElements needs at least one element as argument");
+ }
+
+ TypeInformation<OUT> typeInfo = TypeExtractor.getForObject(data[0]);
+
+ SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);
+
+ return addSource(function, "Elements source").returns(typeInfo);
+ }
+
+ /**
+ * Creates a data stream from the given non-empty collection. The type of the data stream is that of the
+ * elements in
+ * the collection. The elements need to be serializable (as defined by {@link java.io.Serializable}), because the
+ * framework may move the elements into the cluster if needed.
+ * <p/>
+ * The framework will try and determine the exact type from the collection elements. In case of generic
+ * elements, it
+ * may be necessary to manually supply the type information via {@link #fromCollection(java.util.Collection,
+ * org.apache.flink.api.common.typeinfo.TypeInformation)}.
+ * <p/>
+ * Note that this operation will result in a non-parallel data stream source, i.e. a data stream source with a
+ * degree of parallelism one.
+ *
+ * @param data
+ * The collection of elements to create the data stream from
+ * @param <OUT>
+ * The type of the returned data stream
+ * @return The data stream representing the given collection
+ */
+ public <OUT extends Serializable> DataStreamSource<OUT> fromCollection(Collection<OUT> data) {
+ Preconditions.checkNotNull(data, "Collection must not be null");
+ if (data.isEmpty()) {
+ throw new IllegalArgumentException("Collection must not be empty");
+ }
+
+ TypeInformation<OUT> typeInfo = TypeExtractor.getForObject(data.iterator().next());
+ SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);
+ checkCollection(data, typeInfo.getTypeClass());
+
+ return addSource(function, "Collection Source").returns(typeInfo);
+ }
+
+ /**
+ * Creates a data stream from the given non-empty collection. The type of the data stream is the type given by
+ * typeInfo. The elements need to be serializable (as defined by {@link java.io.Serializable}), because the
+ * framework may move the elements into the cluster if needed.
+ * <p/>
+ * Note that this operation will result in a non-parallel data stream source, i.e. a data stream source with a
+ * degree of parallelism one.
+ *
+ * @param data
+ * The collection of elements to create the data stream from
+ * @param typeInfo
+ * The TypeInformation for the produced data stream
+ * @param <OUT>
+ * The type of the returned data stream
+ * @return The data stream representing the given collection
+ */
+ public <OUT extends Serializable> DataStreamSource<OUT> fromCollection(Collection<OUT> data, TypeInformation<OUT>
+ typeInfo) {
+ Preconditions.checkNotNull(data, "Collection must not be null");
+ if (data.isEmpty()) {
+ throw new IllegalArgumentException("Collection must not be empty");
+ }
+
+ SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);
+ checkCollection(data, typeInfo.getTypeClass());
+
+ return addSource(function, "Collection Source").returns(typeInfo);
+ }
+
+ /**
+ * Creates a data stream from the given iterator. Because the iterator will remain unmodified until the actual
+ * execution happens, the type of data returned by the iterator must be given explicitly in the form of the type
+ * class (this is due to the fact that the Java compiler erases the generic type information).
+ * <p/>
+ * The iterator must be serializable (as defined in {@link java.io.Serializable}), because the framework may
+ * move it
+ * to a remote environment, if needed.
+ * <p/>
+ * Note that this operation will result in a non-parallel data stream source, i.e. a data stream source with a
+ * degree of parallelism of one.
+ *
+ * @param data
+ * The iterator of elements to create the data stream from
+ * @param type
+ * The class of the data produced by the iterator. Must not be a generic class.
+ * @param <OUT>
+ * The type of the returned data stream
+ * @return The data stream representing the elements in the iterator
+ * @see #fromCollection(java.util.Iterator, org.apache.flink.api.common.typeinfo.TypeInformation)
+ */
+ public <OUT extends Serializable> DataStreamSource<OUT> fromCollection(Iterator<OUT> data, Class<OUT> type) {
+ return fromCollection(data, TypeExtractor.getForClass(type));
+ }
+
+ /**
+ * Creates a data stream from the given iterator. Because the iterator will remain unmodified until the actual
+ * execution happens, the type of data returned by the iterator must be given explicitly in the form of the type
+ * information. This method is useful for cases where the type is generic. In that case, the type class (as
+ * given in
+ * {@link #fromCollection(java.util.Iterator, Class)} does not supply all type information.
+ * <p/>
+ * The iterator must be serializable (as defined in {@link java.io.Serializable}), because the framework may
+ * move it
+ * to a remote environment, if needed.
+ * <p/>
+ * Note that this operation will result in a non-parallel data stream source, i.e. a data stream source with a
+ * degree of parallelism one.
+ *
+ * @param data
+ * The iterator of elements to create the data stream from
+ * @param typeInfo
+ * The TypeInformation for the produced data stream
+ * @param <OUT>
+ * The type of the returned data stream
+ * @return The data stream representing the elements in the iterator
+ */
+ public <OUT extends Serializable> DataStreamSource<OUT> fromCollection(Iterator<OUT> data, TypeInformation<OUT>
+ typeInfo) {
+ Preconditions.checkNotNull(data, "The iterator must not be null");
+ if (!(data instanceof Serializable)) {
+ throw new IllegalArgumentException("The iterator must be serializable.");
+ }
+
+ SourceFunction<OUT> function = new FromIteratorFunction<OUT>(data);
+ return addSource(function, "Collection Source").returns(typeInfo);
+ }
+
+ /**
+ * Creates a new data stream that contains elements in the iterator. The iterator is splittable, allowing the
+ * framework to create a parallel data stream source that returns the elements in the iterator. The iterator
+ * must be
+ * serializable (as defined in {@link java.io.Serializable}, because the execution environment may ship the
+ * elements
+ * into the cluster.
+ * <p/>
+ * Because the iterator will remain unmodified until the actual execution happens, the type of data returned by the
+ * iterator must be given explicitly in the form of the type class (this is due to the fact that the Java compiler
+ * erases the generic type information).
+ *
+ * @param iterator
+ * The iterator that produces the elements of the data stream
+ * @param type
+ * The class of the data produced by the iterator. Must not be a generic class.
+ * @param <OUT>
+ * The type of the returned data stream
+ * @return A data stream representing the elements in the iterator
+ */
+ public <OUT> DataStreamSource<OUT> fromParallelCollection(SplittableIterator<OUT> iterator, Class<OUT> type) {
+ return fromParallelCollection(iterator, TypeExtractor.getForClass(type));
+ }
+
+ /**
+ * Creates a new data stream that contains elements in the iterator. The iterator is splittable, allowing the
+ * framework to create a parallel data stream source that returns the elements in the iterator. The iterator
+ * must be
+ * serializable (as defined in {@link java.io.Serializable}, because the execution environment may ship the
+ * elements
+ * into the cluster.
+ * <p/>
+ * Because the iterator will remain unmodified until the actual execution happens, the type of data returned by the
+ * iterator must be given explicitly in the form of the type information. This method is useful for cases where the
+ * type is generic. In that case, the type class (as given in {@link #fromParallelCollection(org.apache.flink.util.SplittableIterator,
+ * Class)} does not supply all type information.
+ *
+ * @param iterator
+ * The iterator that produces the elements of the data stream
+ * @param typeInfo
+ * The TypeInformation for the produced data stream.
+ * @param <OUT>
+ * The type of the returned data stream
+ * @return A data stream representing the elements in the iterator
+ */
+ public <OUT> DataStreamSource<OUT> fromParallelCollection(SplittableIterator<OUT> iterator, TypeInformation<OUT>
+ typeInfo) {
+ return fromParallelCollection(iterator, typeInfo, "Parallel Collection source");
+ }
+
+ // private helper for passing different names
+ private <OUT> DataStreamSource<OUT> fromParallelCollection(SplittableIterator<OUT> iterator, TypeInformation<OUT>
+ typeInfo, String operatorName) {
+ return addSource(new FromIteratorFunction<OUT>(iterator), operatorName).returns(typeInfo);
+ }
+
+ /**
+ * Creates a data stream that represents the Strings produced by reading the given file line wise. The file will be
+ * read with the system's default character set.
+ *
* @param filePath
- * The path of the file, as a URI (e.g.,
- * "file:///some/local/file" or "hdfs://host:port/file/path").
- * @return The DataStream representing the text file.
+ * The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path").
+ * @return The data stream that represents the data read from the given file as text lines
*/
public DataStreamSource<String> readTextFile(String filePath) {
Preconditions.checkNotNull(filePath, "The file path may not be null.");
TextInputFormat format = new TextInputFormat(new Path(filePath));
TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
- return addFileSource(format, typeInfo);
+ return createInput(format, typeInfo, "Read Text File Source");
}
/**
- * Creates a DataStream that represents the Strings produced by reading the
- * given file line wise. The file will be read with the given character set.
- *
+ * Creates a data stream that represents the Strings produced by reading the given file line wise. The {@link
+ * java.nio.charset.Charset} with the given name will be used to read the files.
+ *
* @param filePath
- * The path of the file, as a URI (e.g.,
- * "file:///some/local/file" or "hdfs://host:port/file/path").
- * @return The DataStream representing the text file.
+ * The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path")
+ * @param charsetName
+ * The name of the character set used to read the file
+ * @return The data stream that represents the data read from the given file as text lines
*/
public DataStreamSource<String> readTextFile(String filePath, String charsetName) {
Preconditions.checkNotNull(filePath, "The file path may not be null.");
@@ -422,201 +677,352 @@ public abstract class StreamExecutionEnvironment {
TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
format.setCharsetName(charsetName);
- return addFileSource(format, typeInfo);
+ return createInput(format, typeInfo, "Read Text File Source");
}
/**
- * Creates a DataStream that contains the contents of file created while
- * system watches the given path. The file will be read with the system's
- * default character set.
- *
+ * Creates a data stream that contains the contents of file created while system watches the given path. The file
+ * will be read with the system's default character set.
+ *
* @param filePath
- * The path of the file, as a URI (e.g.,
- * "file:///some/local/file" or "hdfs://host:port/file/path/").
+ * The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path/")
* @param intervalMillis
- * The interval of file watching in milliseconds.
+ * The interval of file watching in milliseconds
* @param watchType
- * The watch type of file stream. When watchType is
- * {@link WatchType#ONLY_NEW_FILES}, the system processes only
- * new files. {@link WatchType#REPROCESS_WITH_APPENDED} means
- * that the system re-processes all contents of appended file.
- * {@link WatchType#PROCESS_ONLY_APPENDED} means that the system
- * processes only appended contents of files.
- *
+ * The watch type of file stream. When watchType is {@link org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType#ONLY_NEW_FILES}, the system processes
+ * only
+ * new files. {@link org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType#REPROCESS_WITH_APPENDED} means that the system re-processes all contents of
+ * appended file. {@link org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType#PROCESS_ONLY_APPENDED} means that the system processes only appended
+ * contents
+ * of files.
* @return The DataStream containing the given directory.
*/
public DataStream<String> readFileStream(String filePath, long intervalMillis,
WatchType watchType) {
-// DataStream<Tuple3<String, Long, Long>> source = addSource(new FileMonitoringFunction(
-// filePath, intervalMillis, watchType), "File Stream");
-// return source.flatMap(new FileReadFunction());
- return null;
+ DataStream<Tuple3<String, Long, Long>> source = addSource(new FileMonitoringFunction(
+ filePath, intervalMillis, watchType), "Read File Stream source");
+
+ return source.flatMap(new FileReadFunction());
}
/**
- * Creates a new DataStream that contains the given elements. The elements
- * must all be of the same type, for example, all of the String or Integer.
- * The sequence of elements must not be empty. Furthermore, the elements
- * must be serializable (as defined in java.io.Serializable), because the
- * execution environment may ship the elements into the cluster.
- *
- * @param data
- * The collection of elements to create the DataStream from.
- * @param <OUT>
- * type of the returned stream
- * @return The DataStream representing the elements.
+ * Creates a data stream that represents the strings produced by reading the given file line wise. This method is
+ * similar to {@link #readTextFile(String)}, but it produces a data stream with mutable {@link org.apache.flink.types.StringValue}
+ * objects,
+ * rather than Java Strings. StringValues can be used to tune implementations to be less object and garbage
+ * collection heavy.
+ * <p/>
+ * The file will be read with the system's default character set.
+ *
+ * @param filePath
+ * The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path")
+ * @return A data stream that represents the data read from the given file as text lines
*/
- public <OUT extends Serializable> DataStreamSource<OUT> fromElements(OUT... data) {
- if (data.length == 0) {
- throw new IllegalArgumentException(
- "fromElements needs at least one element as argument");
- }
+ public DataStreamSource<StringValue> readTextFileWithValue(String filePath) {
+ Preconditions.checkNotNull(filePath, "The file path may not be null.");
+ TextValueInputFormat format = new TextValueInputFormat(new Path(filePath));
+ TypeInformation<StringValue> typeInfo = new ValueTypeInfo<StringValue>(StringValue.class);
- TypeInformation<OUT> outTypeInfo = TypeExtractor.getForObject(data[0]);
+ return createInput(format, typeInfo, "Read Text File with Value " +
+ "source");
+ }
- SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);
+ /**
+ * Creates a data stream that represents the Strings produced by reading the given file line wise. This method is
+ * similar to {@link #readTextFile(String, String)}, but it produces a data stream with mutable {@link org.apache.flink.types.StringValue}
+ * objects, rather than Java Strings. StringValues can be used to tune implementations to be less object and
+ * garbage
+ * collection heavy.
+ * <p/>
+ * The {@link java.nio.charset.Charset} with the given name will be used to read the files.
+ *
+ * @param filePath
+ * The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path")
+ * @param charsetName
+ * The name of the character set used to read the file
+ * @param skipInvalidLines
+ * A flag to indicate whether to skip lines that cannot be read with the given character set
+ * @return A data stream that represents the data read from the given file as text lines
+ */
+ public DataStreamSource<StringValue> readTextFileWithValue(String filePath, String charsetName, boolean
+ skipInvalidLines) {
+ Preconditions.checkNotNull(filePath, "The file path may not be null.");
- return addSource(function, "Elements source").returns(outTypeInfo);
+ TextValueInputFormat format = new TextValueInputFormat(new Path(filePath));
+ TypeInformation<StringValue> typeInfo = new ValueTypeInfo<StringValue>(StringValue.class);
+ format.setCharsetName(charsetName);
+ format.setSkipInvalidLines(skipInvalidLines);
+ return createInput(format, typeInfo, "Read Text File with Value " +
+ "source");
}
/**
- * Creates a DataStream from the given non-empty collection. The type of the
- * DataStream is that of the elements in the collection. The elements need
- * to be serializable (as defined by java.io.Serializable), because the
- * framework may move the elements into the cluster if needed.
- *
- * @param data
- * The collection of elements to create the DataStream from.
+ * Reads the given file with the given imput format.
+ *
+ * @param filePath
+ * The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path")
+ * @param inputFormat
+ * The input format used to create the data stream
* @param <OUT>
- * type of the returned stream
- * @return The DataStream representing the elements.
+ * The type of the returned data stream
+ * @return The data stream that represents the data read from the given file
*/
- public <OUT extends Serializable> DataStreamSource<OUT> fromCollection(Collection<OUT> data) {
- if (data == null) {
- throw new NullPointerException("Collection must not be null");
+ public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat, String filePath) {
+ Preconditions.checkNotNull(inputFormat, "InputFormat must not be null.");
+ Preconditions.checkNotNull(filePath, "The file path must not be null.");
+
+ inputFormat.setFilePath(new Path(filePath));
+ try {
+ return createInput(inputFormat, TypeExtractor.getInputFormatTypes(inputFormat), "Read File source");
+ } catch (Exception e) {
+ throw new InvalidProgramException("The type returned by the input format could not be automatically " +
+ "determined. " +
+ "Please specify the TypeInformation of the produced type explicitly by using the " +
+ "'createInput(InputFormat, TypeInformation)' method instead.");
}
+ }
- if (data.isEmpty()) {
- throw new IllegalArgumentException("Collection must not be empty");
- }
+ /**
+ * Creates a data stream that represents the primitive type produced by reading the given file line wise.
+ *
+ * @param filePath
+ * The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path")
+ * @param typeClass
+ * The primitive type class to be read
+ * @param <OUT>
+ * The type of the returned data stream
+ * @return A data stream that represents the data read from the given file as primitive type
+ */
+ public <OUT> DataStreamSource<OUT> readFileOfPrimitives(String filePath, Class<OUT> typeClass) {
+ Preconditions.checkNotNull(filePath, "The file path may not be null.");
+ PrimitiveInputFormat<OUT> inputFormat = new PrimitiveInputFormat<OUT>(new Path(filePath), typeClass);
+ TypeInformation<OUT> typeInfo = TypeExtractor.getForClass(typeClass);
- TypeInformation<OUT> outTypeInfo = TypeExtractor.getForObject(data.iterator().next());
- SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);
+ return createInput(inputFormat, typeInfo, "Read File of Primitives source");
+ }
+
+ /**
+ * Creates a data stream that represents the primitive type produced by reading the given file in delimited way.
+ *
+ * @param filePath
+ * The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path")
+ * @param delimiter
+ * The delimiter of the given file
+ * @param typeClass
+ * The primitive type class to be read
+ * @param <OUT>
+ * The type of the returned data stream
+ * @return A data stream that represents the data read from the given file as primitive type.
+ */
+ public <OUT> DataStreamSource<OUT> readFileOfPrimitives(String filePath, String delimiter, Class<OUT> typeClass) {
+ Preconditions.checkNotNull(filePath, "The file path may not be null.");
+ PrimitiveInputFormat<OUT> inputFormat = new PrimitiveInputFormat<OUT>(new Path(filePath), delimiter,
+ typeClass);
+ TypeInformation<OUT> typeInfo = TypeExtractor.getForClass(typeClass);
- return addSource(function, "Collection Source").returns(outTypeInfo);
+ return createInput(inputFormat, typeInfo, "Read File of Primitives source");
}
/**
- * Creates a new DataStream that contains the strings received infinitely
- * from socket. Received strings are decoded by the system's default
- * character set. On the termination of the socket server connection retries
- * can be initiated.
- *
- * <p>
- * Let us note that the socket itself does not report on abort and as a
- * consequence retries are only initiated when the socket was gracefully
- * terminated.
- * </p>
- *
+ * Creates a data stream from the given {@link org.apache.hadoop.mapred.FileInputFormat}. A {@link
+ * org.apache.hadoop.mapreduce.Job} with the given inputPath is created.
+ */
+ public <K, V> DataStreamSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K, V>
+ mapredInputFormat, Class<K> key, Class<V> value, String inputPath, JobConf job) {
+ DataStreamSource<Tuple2<K, V>> result = createHadoopInput(mapredInputFormat, key, value, job);
+
+ org.apache.hadoop.mapred.FileInputFormat.addInputPath(job, new org.apache.hadoop.fs.Path(inputPath));
+
+ return result;
+ }
+
+ /**
+ * Creates a data stream from the given {@link org.apache.hadoop.mapred.FileInputFormat}. A {@link
+ * org.apache.hadoop.mapred.JobConf} with the given inputPath is created.
+ */
+ public <K, V> DataStreamSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K, V>
+ mapredInputFormat, Class<K> key, Class<V> value, String inputPath) {
+ return readHadoopFile(mapredInputFormat, key, value, inputPath, new JobConf());
+ }
+
+ /**
+ * Creates a data stream from the given {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}. A {@link
+ * org.apache.hadoop.mapreduce.Job} with the given inputPath is created.
+ */
+ public <K, V> DataStreamSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapreduce.lib.input
+ .FileInputFormat<K, V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath, Job job) throws
+ IOException {
+ DataStreamSource<Tuple2<K, V>> result = createHadoopInput(mapredInputFormat, key, value, job);
+
+ org.apache.hadoop.mapreduce.lib.input.FileInputFormat.addInputPath(job, new org.apache
+ .hadoop.fs.Path(inputPath));
+
+ return result;
+ }
+
+ /**
+ * Creates a data stream from the given {@link org.apache.hadoop.mapreduce.InputFormat}.
+ */
+ public <K, V> DataStreamSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapreduce.lib.input
+ .FileInputFormat<K, V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath) throws
+ IOException {
+ return readHadoopFile(mapredInputFormat, key, value, inputPath, Job.getInstance());
+ }
+
+ /**
+ * Creates a new data stream that contains the strings received infinitely from a socket. Received strings are
+ * decoded by the system's default character set. On the termination of the socket server connection retries can be
+ * initiated.
+ * <p/>
+ * Let us note that the socket itself does not report on abort and as a consequence retries are only initiated when
+ * the socket was gracefully terminated.
+ *
* @param hostname
- * The host name which a server socket bind.
+ * The host name which a server socket binds
* @param port
- * The port number which a server socket bind. A port number of 0
- * means that the port number is automatically allocated.
+ * The port number which a server socket binds. A port number of 0 means that the port number is automatically
+ * allocated.
* @param delimiter
- * A character which split received strings into records.
+ * A character which splits received strings into records
* @param maxRetry
- * The maximal retry interval in seconds while the program waits
- * for a socket that is temporarily down. Reconnection is
- * initiated every second. A number of 0 means that the reader is
- * immediately terminated, while a negative value ensures
- * retrying forever.
- * @return A DataStream, containing the strings received from socket.
- *
+ * The maximal retry interval in seconds while the program waits for a socket that is temporarily down.
+ * Reconnection is initiated every second. A number of 0 means that the reader is immediately terminated,
+ * while
+ * a negative value ensures retrying forever.
+ * @return A data stream containing the strings received from the socket
*/
- public DataStreamSource<String> socketTextStream(String hostname, int port, char delimiter,
- long maxRetry) {
+ public DataStreamSource<String> socketTextStream(String hostname, int port, char delimiter, long maxRetry) {
return addSource(new SocketTextStreamFunction(hostname, port, delimiter, maxRetry),
"Socket Stream");
}
/**
- * Creates a new DataStream that contains the strings received infinitely
- * from socket. Received strings are decoded by the system's default
- * character set. The reader is terminated immediately when socket is down.
- *
+ * Creates a new data stream that contains the strings received infinitely from a socket. Received strings are
+ * decoded by the system's default character set. The reader is terminated immediately when the socket is down.
+ *
* @param hostname
- * The host name which a server socket bind.
+ * The host name which a server socket binds
* @param port
- * The port number which a server socket bind. A port number of 0
- * means that the port number is automatically allocated.
+ * The port number which a server socket binds. A port number of 0 means that the port number is automatically
+ * allocated.
* @param delimiter
- * A character which split received strings into records.
- * @return A DataStream, containing the strings received from socket.
+ * A character which splits received strings into records
+ * @return A data stream containing the strings received from the socket
*/
public DataStreamSource<String> socketTextStream(String hostname, int port, char delimiter) {
return socketTextStream(hostname, port, delimiter, 0);
}
/**
- * Creates a new DataStream that contains the strings received infinitely
- * from socket. Received strings are decoded by the system's default
- * character set, uses '\n' as delimiter. The reader is terminated
- * immediately when socket is down.
- *
+ * Creates a new data stream that contains the strings received infinitely from a socket. Received strings are
+ * decoded by the system's default character set, using'\n' as delimiter. The reader is terminated immediately when
+ * the socket is down.
+ *
* @param hostname
- * The host name which a server socket bind.
+ * The host name which a server socket binds
* @param port
- * The port number which a server socket bind. A port number of 0
- * means that the port number is automatically allocated.
- * @return A DataStream, containing the strings received from socket.
+ * The port number which a server socket binds. A port number of 0 means that the port number is automatically
+ * allocated.
+ * @return A data stream containing the strings received from the socket
*/
public DataStreamSource<String> socketTextStream(String hostname, int port) {
return socketTextStream(hostname, port, '\n');
}
/**
- * Creates a new DataStream that contains a sequence of numbers.
- *
- * @param from
- * The number to start at (inclusive).
- * @param to
- * The number to stop at (inclusive)
- * @return A DataStrean, containing all number in the [from, to] interval.
+ * Creates a data stream from the given {@link org.apache.hadoop.mapred.InputFormat}.
*/
- public DataStreamSource<Long> generateSequence(long from, long to) {
- if (from > to) {
- throw new IllegalArgumentException("Start of sequence must not be greater than the end");
- }
- return addSource(new GenSequenceFunction(from, to), "Sequence Source");
+ public <K, V> DataStreamSource<Tuple2<K, V>> createHadoopInput(org.apache.hadoop.mapred.InputFormat<K, V>
+ mapredInputFormat, Class<K> key, Class<V> value, JobConf job) {
+ HadoopInputFormat<K, V> hadoopInputFormat = new HadoopInputFormat<K, V>(mapredInputFormat, key, value, job);
+
+ return createInput(hadoopInputFormat, TypeExtractor.getInputFormatTypes(hadoopInputFormat), "Hadoop " +
+ "Input source");
+ }
+
+ /**
+ * Creates a data stream from the given {@link org.apache.hadoop.mapred.InputFormat}.
+ */
+ public <K, V> DataStreamSource<Tuple2<K, V>> createHadoopInput(org.apache.hadoop.mapreduce.InputFormat<K, V>
+ mapredInputFormat, Class<K> key, Class<V> value, Job job) {
+ org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> hadoopInputFormat = new org.apache.flink
+ .api.java.hadoop.mapreduce.HadoopInputFormat<K, V>(mapredInputFormat, key, value, job);
+
+ return createInput(hadoopInputFormat, TypeExtractor.getInputFormatTypes(hadoopInputFormat), "Hadoop Input " +
+ "source");
+ }
+
+ /**
+ * Generic method to create an input data stream with {@link org.apache.flink.api.common.io.InputFormat}. The data stream will not be immediately
+ * created - instead, this method returns a data stream that will be lazily created from the input format once the
+ * program is executed.
+ * <p/>
+ * Since all data streams need specific information about their types, this method needs to determine the type of
+ * the data produced by the input format. It will attempt to determine the data type by reflection, unless the
+ * input
+ * format implements the {@link org.apache.flink.api.java.typeutils .ResultTypeQueryable} interface. In the latter
+ * case, this method will invoke the {@link org.apache.flink.api.java.typeutils
+ * .ResultTypeQueryable#getProducedType()} method to determine data type produced by the input format.
+ *
+ * @param inputFormat
+ * The input format used to create the data stream
+ * @param <OUT>
+ * The type of the returned data stream
+ * @return The data stream that represents the data created by the input format
+ */
+ public <OUT> DataStreamSource<OUT> createInput(InputFormat<OUT, ?> inputFormat) {
+ return createInput(inputFormat, TypeExtractor.getInputFormatTypes(inputFormat), "Custom File source");
+ }
+
+ /**
+ * Generic method to create an input data stream with {@link org.apache.flink.api.common.io.InputFormat}. The data stream will not be immediately
+ * created - instead, this method returns a data stream that will be lazily created from the input format once the
+ * program is executed.
+ * <p/>
+ * The data stream is typed to the given TypeInformation. This method is intended for input formats where the
+ * return
+ * type cannot be determined by reflection analysis, and that do not implement the {@link
+ * org.apache.flink.api.java.typeutils.ResultTypeQueryable} interface.
+ *
+ * @param inputFormat
+ * The input format used to create the data stream
+ * @param <OUT>
+ * The type of the returned data stream
+ * @return The data stream that represents the data created by the input format
+ */
+ public <OUT> DataStreamSource<OUT> createInput(InputFormat<OUT, ?> inputFormat, TypeInformation<OUT> typeInfo) {
+ return createInput(inputFormat, typeInfo, "Custom File source");
}
- private DataStreamSource<String> addFileSource(InputFormat<String, ?> inputFormat,
- TypeInformation<String> typeInfo) {
- FileSourceFunction function = new FileSourceFunction(inputFormat, typeInfo);
- DataStreamSource<String> returnStream = addSource(function, "File Source");
+ // private helper for passing different names
+ private <OUT> DataStreamSource<OUT> createInput(InputFormat<OUT, ?> inputFormat,
+ TypeInformation<OUT> typeInfo, String sourceName) {
+ FileSourceFunction<OUT> function = new FileSourceFunction<OUT>(inputFormat, typeInfo);
+ DataStreamSource<OUT> returnStream = addSource(function, sourceName).returns(typeInfo);
streamGraph.setInputFormat(returnStream.getId(), inputFormat);
return returnStream;
}
/**
- * Create a DataStream using a user defined source function for arbitrary
- * source functionality.</p> By default sources have a parallelism of 1. To
- * enable parallel execution, the user defined source should implement
- * {@link ParallelSourceFunction} or extend
- * {@link RichParallelSourceFunction}. In these cases the resulting source
- * will have the parallelism of the environment. To change this afterwards
- * call {@link DataStreamSource#setParallelism(int)}
- *
- *
+ * Ads a data source with a custom type information thus opening a {@link org.apache.flink.streaming.api
+ * .datastream.DataStream}. Only in very special cases does the user need to support type information. Otherwise
+ * use
+ * {@link #addSource(org.apache.flink.streaming.api.function.source.SourceFunction)}
+ * <p/>
+ * By default sources have a parallelism of 1. To enable parallel execution, the user defined source should
+ * implement {@link org.apache.flink.streaming.api.function.source.ParallelSourceFunction} or extend {@link
+ * org.apache.flink.streaming.api.function.source.RichParallelSourceFunction}. In these cases the resulting source
+ * will have the parallelism of the environment. To change this afterwards call {@link
+ * org.apache.flink.streaming.api.datastream.DataStreamSource#setParallelism(int)}
+ *
* @param function
- * the user defined function
+ * the user defined function
* @param <OUT>
- * type of the returned stream
+ * type of the returned stream
* @return the data stream constructed
*/
public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function) {
- return addSource(function, "Custom source");
+ return addSource(function, "Custom Source");
}
/**
@@ -626,27 +1032,27 @@ public abstract class StreamExecutionEnvironment {
* {@link #addSource(org.apache.flink.streaming.api.functions.source.SourceFunction)}
*
* @param function
- * the user defined function
+ * the user defined function
* @param sourceName
- * Name of the data source
+ * Name of the data source
* @param <OUT>
- * type of the returned stream
+ * type of the returned stream
* @return the data stream constructed
*/
@SuppressWarnings("unchecked")
- private <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName) {
+ public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName) {
- TypeInformation<OUT> outTypeInfo;
+ TypeInformation<OUT> typeInfo;
if (function instanceof ResultTypeQueryable) {
- outTypeInfo = ((ResultTypeQueryable<OUT>) function).getProducedType();
+ typeInfo = ((ResultTypeQueryable<OUT>) function).getProducedType();
} else {
try {
- outTypeInfo = TypeExtractor.createTypeInfo(
+ typeInfo = TypeExtractor.createTypeInfo(
SourceFunction.class,
function.getClass(), 0, null, null);
} catch (InvalidTypesException e) {
- outTypeInfo = (TypeInformation<OUT>) new MissingTypeInfo("Custom source", e);
+ typeInfo = (TypeInformation<OUT>) new MissingTypeInfo("Custom source", e);
}
}
@@ -655,8 +1061,8 @@ public abstract class StreamExecutionEnvironment {
ClosureCleaner.clean(function, true);
StreamOperator<OUT> sourceOperator = new StreamSource<OUT>(function);
- return new DataStreamSource<OUT>(this, sourceName, outTypeInfo, sourceOperator, isParallel,
- sourceName);
+ return new DataStreamSource<OUT>(this, sourceName, typeInfo, sourceOperator,
+ isParallel, sourceName);
}
// --------------------------------------------------------------------------------------------
@@ -668,9 +1074,9 @@ public abstract class StreamExecutionEnvironment {
* program is currently executed. If the program is invoked standalone, this
* method returns a local execution environment, as returned by
* {@link #createLocalEnvironment()}.
- *
+ *
* @return The execution environment of the context in which the program is
- * executed.
+ * executed.
*/
public static StreamExecutionEnvironment getExecutionEnvironment() {
if (currentEnvironment != null) {
@@ -700,7 +1106,7 @@ public abstract class StreamExecutionEnvironment {
* environment was created in. The default parallelism of the local
* environment is the number of hardware contexts (CPU cores / threads),
* unless it was specified differently by {@link #setParallelism(int)}.
- *
+ *
* @return A local execution environment.
*/
public static LocalStreamEnvironment createLocalEnvironment() {
@@ -712,9 +1118,9 @@ public abstract class StreamExecutionEnvironment {
* will run the program in a multi-threaded fashion in the same JVM as the
* environment was created in. It will use the parallelism specified in the
* parameter.
- *
+ *
* @param parallelism
- * The parallelism for the local environment.
+ * The parallelism for the local environment.
* @return A local execution environment with the specified parallelism.
*/
public static LocalStreamEnvironment createLocalEnvironment(int parallelism) {
@@ -724,24 +1130,25 @@ public abstract class StreamExecutionEnvironment {
}
// TODO:fix cluster default parallelism
+
/**
* Creates a {@link RemoteStreamEnvironment}. The remote environment sends
* (parts of) the program to a cluster for execution. Note that all file
* paths used in the program must be accessible from the cluster. The
* execution will use no parallelism, unless the parallelism is set
* explicitly via {@link #setParallelism}.
- *
+ *
* @param host
- * The host name or address of the master (JobManager), where the
- * program should be executed.
+ * The host name or address of the master (JobManager), where the
+ * program should be executed.
* @param port
- * The port of the master (JobManager), where the program should
- * be executed.
+ * The port of the master (JobManager), where the program should
+ * be executed.
* @param jarFiles
- * The JAR files with code that needs to be shipped to the
- * cluster. If the program uses user-defined functions,
- * user-defined input formats, or any libraries, those must be
- * provided in the JAR files.
+ * The JAR files with code that needs to be shipped to the
+ * cluster. If the program uses user-defined functions,
+ * user-defined input formats, or any libraries, those must be
+ * provided in the JAR files.
* @return A remote environment that executes the program on a cluster.
*/
public static StreamExecutionEnvironment createRemoteEnvironment(String host, int port,
@@ -755,20 +1162,20 @@ public abstract class StreamExecutionEnvironment {
* (parts of) the program to a cluster for execution. Note that all file
* paths used in the program must be accessible from the cluster. The
* execution will use the specified parallelism.
- *
+ *
* @param host
- * The host name or address of the master (JobManager), where the
- * program should be executed.
+ * The host name or address of the master (JobManager), where the
+ * program should be executed.
* @param port
- * The port of the master (JobManager), where the program should
- * be executed.
+ * The port of the master (JobManager), where the program should
+ * be executed.
* @param parallelism
- * The parallelism to use during the execution.
+ * The parallelism to use during the execution.
* @param jarFiles
- * The JAR files with code that needs to be shipped to the
- * cluster. If the program uses user-defined functions,
- * user-defined input formats, or any libraries, those must be
- * provided in the JAR files.
+ * The JAR files with code that needs to be shipped to the
+ * cluster. If the program uses user-defined functions,
+ * user-defined input formats, or any libraries, those must be
+ * provided in the JAR files.
* @return A remote environment that executes the program on a cluster.
*/
public static StreamExecutionEnvironment createRemoteEnvironment(String host, int port,
@@ -782,34 +1189,34 @@ public abstract class StreamExecutionEnvironment {
* Triggers the program execution. The environment will execute all parts of
* the program that have resulted in a "sink" operation. Sink operations are
* for example printing results or forwarding them to a message queue.
- * <p>
+ * <p/>
* The program execution will be logged and displayed with a generated
* default name.
- *
+ *
* @return The result of the job execution, containing elapsed time and
- * accumulators.
+ * accumulators.
* @throws Exception
- **/
+ */
public abstract JobExecutionResult execute() throws Exception;
/**
* Triggers the program execution. The environment will execute all parts of
* the program that have resulted in a "sink" operation. Sink operations are
* for example printing results or forwarding them to a message queue.
- * <p>
+ * <p/>
* The program execution will be logged and displayed with the provided name
- *
+ *
* @param jobName
- * Desired name of the job
+ * Desired name of the job
* @return The result of the job execution, containing elapsed time and
- * accumulators.
+ * accumulators.
* @throws Exception
- **/
+ */
public abstract JobExecutionResult execute(String jobName) throws Exception;
/**
- * Getter of the {@link StreamGraph} of the streaming job.
- *
+ * Getter of the {@link org.apache.flink.streaming.api.graph.StreamGraph} of the streaming job.
+ *
* @return The streamgraph representing the transformations
*/
public StreamGraph getStreamGraph() {
@@ -821,7 +1228,7 @@ public abstract class StreamExecutionEnvironment {
* returns it as a String using a JSON representation of the execution data
* flow graph. Note that this needs to be called, before the plan is
* executed.
- *
+ *
* @return The execution plan of the program, as a JSON String.
*/
public String getExecutionPlan() {
@@ -832,4 +1239,17 @@ public abstract class StreamExecutionEnvironment {
currentEnvironment = eef.createExecutionEnvironment();
}
+ private static <OUT> void checkCollection(Collection<OUT> elements, Class<OUT> viewedAs) {
+ Preconditions.checkNotNull(viewedAs);
+
+ for (OUT elem : elements) {
+ Preconditions.checkNotNull(elem, "The collection must not contain null elements.");
+
+ if (!viewedAs.isAssignableFrom(elem.getClass())) {
+ throw new IllegalArgumentException("The elements in the collection are not all subclasses of " +
+ viewedAs.getCanonicalName());
+ }
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1c3b67ed/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java
index 1a177c6..f74d81c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java
@@ -55,8 +55,6 @@ public class FileMonitoringFunction extends RichSourceFunction<Tuple3<String, Lo
private Map<String, Long> offsetOfFiles;
private Map<String, Long> modificationTimes;
- private volatile boolean isRunning = false;
-
private Queue<Tuple3<String, Long, Long>> pendingFiles;
public FileMonitoringFunction(String path, long interval, WatchType watchType) {
http://git-wip-us.apache.org/repos/asf/flink/blob/1c3b67ed/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
index f1dcd2b..af00d4d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
@@ -17,9 +17,6 @@
package org.apache.flink.streaming.api.functions.source;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -28,24 +25,23 @@ import org.apache.flink.core.io.InputSplit;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
-public class FileSourceFunction extends RichParallelSourceFunction<String> {
- private static final long serialVersionUID = 1L;
-
- private InputSplitProvider provider;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
- private InputFormat<String, ?> inputFormat;
+public class FileSourceFunction<OUT> extends RichParallelSourceFunction<OUT> {
+ private static final long serialVersionUID = 1L;
- private TypeInformation<String> typeInfo;
- private transient TypeSerializer<String> serializer;
+ private TypeInformation<OUT> typeInfo;
+ private transient TypeSerializer<OUT> serializer;
- private InputFormat<String, InputSplit> format;
+ private InputSplitProvider provider;
+ private InputFormat<OUT, InputSplit> format;
private Iterator<InputSplit> splitIterator;
+ private transient OUT nextElement;
- private transient String nextElement;
-
- public FileSourceFunction(InputFormat<String, ?> format, TypeInformation<String> typeInfo) {
- this.inputFormat = format;
+ public FileSourceFunction(InputFormat<OUT, ?> format, TypeInformation<OUT> typeInfo) {
+ this.format = (InputFormat<OUT, InputSplit>) format;
this.typeInfo = typeInfo;
}
@@ -54,10 +50,9 @@ public class FileSourceFunction extends RichParallelSourceFunction<String> {
public void open(Configuration parameters) throws Exception {
StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
this.provider = context.getInputSplitProvider();
- inputFormat.configure(context.getTaskStubParameters());
+ format.configure(context.getTaskStubParameters());
serializer = typeInfo.createSerializer(getRuntimeContext().getExecutionConfig());
- format = (InputFormat<String, InputSplit>) this.inputFormat;
splitIterator = getInputSplits();
if (splitIterator.hasNext()) {
format.open(splitIterator.next());
@@ -135,12 +130,12 @@ public class FileSourceFunction extends RichParallelSourceFunction<String> {
}
@Override
- public String next() throws Exception {
+ public OUT next() throws Exception {
if (reachedEnd()) {
throw new RuntimeException("End of FileSource reached.");
}
- String result = nextElement;
+ OUT result = nextElement;
nextElement = null;
return result;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1c3b67ed/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java
new file mode 100644
index 0000000..d46b1f2
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.util.Collector;
+
+import java.io.Serializable;
+import java.util.Iterator;
+
+public class FromIteratorFunction<T> implements SourceFunction<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ Iterator<T> iterator;
+
+ public FromIteratorFunction(Iterator<T> iterator) {
+ this.iterator = iterator;
+ }
+
+ @Override
+ public boolean reachedEnd() throws Exception {
+ return !iterator.hasNext();
+ }
+
+ @Override
+ public T next() throws Exception {
+ return iterator.next();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1c3b67ed/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 009366c..aa0f164 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -304,7 +304,7 @@ public class StreamGraph extends StreamingPlan {
getStreamNode(vertexID).setOperator(operatorObject);
}
- public void setInputFormat(Integer vertexID, InputFormat<String, ?> inputFormat) {
+ public void setInputFormat(Integer vertexID, InputFormat<?, ?> inputFormat) {
getStreamNode(vertexID).setInputFormat(inputFormat);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1c3b67ed/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
index adb07a8..ddc71cb 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
@@ -60,7 +60,7 @@ public class StreamNode implements Serializable {
private Class<? extends AbstractInvokable> jobVertexClass;
- private InputFormat<String, ?> inputFormat;
+ private InputFormat<?, ?> inputFormat;
public StreamNode(StreamExecutionEnvironment env, Integer ID, StreamOperator<?> operator,
String operatorName, List<OutputSelector<?>> outputSelector,
@@ -194,11 +194,11 @@ public class StreamNode implements Serializable {
return jobVertexClass;
}
- public InputFormat<String, ?> getInputFormat() {
+ public InputFormat<?, ?> getInputFormat() {
return inputFormat;
}
- public void setInputFormat(InputFormat<String, ?> inputFormat) {
+ public void setInputFormat(InputFormat<?, ?> inputFormat) {
this.inputFormat = inputFormat;
}
[2/6] flink git commit: [streaming] [scala] [api-breaking]
StreamExecutionEnvironment API update
Posted by mb...@apache.org.
[streaming] [scala] [api-breaking] StreamExecutionEnvironment API update
Closes #738
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/145b2ba7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/145b2ba7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/145b2ba7
Branch: refs/heads/master
Commit: 145b2ba7f8ba72ed56992e74711290d18d32caf2
Parents: 2315c7c
Author: mbalassi <mb...@apache.org>
Authored: Wed May 27 12:40:33 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Fri May 29 15:08:32 2015 +0200
----------------------------------------------------------------------
.../flink/api/scala/ExecutionEnvironment.scala | 12 +-
.../environment/StreamExecutionEnvironment.java | 74 -----------
.../api/StreamExecutionEnvironmentTest.java | 5 +
.../api/scala/StreamExecutionEnvironment.scala | 130 ++++++++++++++++---
4 files changed, 123 insertions(+), 98 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/145b2ba7/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index e01ff3b..28e8458 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -455,8 +455,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
}
/**
- * Creates a DataSet from the given non-empty [[Seq]]. The elements need to be serializable
- * because the framework may move the elements into the cluster if needed.
+ * Creates a DataSet from the given non-empty [[Seq]].
*
* Note that this operation will result in a non-parallel data source, i.e. a data source with
* a parallelism of one.
@@ -476,8 +475,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
}
/**
- * Creates a DataSet from the given [[Iterator]]. The iterator must be serializable because the
- * framework might move into the cluster if needed.
+ * Creates a DataSet from the given [[Iterator]].
*
* Note that this operation will result in a non-parallel data source, i.e. a data source with
* a parallelism of one.
@@ -496,8 +494,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
}
/**
- * Creates a new data set that contains the given elements. The elements must all be of the
- * same type and must be serializable.
+ * Creates a new data set that contains the given elements.
*
* * Note that this operation will result in a non-parallel data source, i.e. a data source with
* a parallelism of one.
@@ -511,8 +508,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
/**
* Creates a new data set that contains elements in the iterator. The iterator is splittable,
* allowing the framework to create a parallel data source that returns the elements in the
- * iterator. The iterator must be serializable because the execution environment may ship the
- * elements into the cluster.
+ * iterator.
*/
def fromParallelCollection[T: ClassTag : TypeInformation](
iterator: SplittableIterator[T]): DataSet[T] = {
http://git-wip-us.apache.org/repos/asf/flink/blob/145b2ba7/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 8dd15bb..651da63 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -29,11 +29,9 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
import org.apache.flink.api.java.io.PrimitiveInputFormat;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.api.java.io.TextValueInputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.MissingTypeInfo;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
@@ -65,11 +63,8 @@ import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.types.StringValue;
import org.apache.flink.util.NumberSequenceIterator;
import org.apache.flink.util.SplittableIterator;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.Job;
import java.io.File;
-import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
@@ -802,52 +797,6 @@ public abstract class StreamExecutionEnvironment {
}
/**
- * Creates a data stream from the given {@link org.apache.hadoop.mapred.FileInputFormat}. A {@link
- * org.apache.hadoop.mapreduce.Job} with the given inputPath is created.
- */
- public <K, V> DataStreamSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K, V>
- mapredInputFormat, Class<K> key, Class<V> value, String inputPath, JobConf job) {
- DataStreamSource<Tuple2<K, V>> result = createHadoopInput(mapredInputFormat, key, value, job);
-
- org.apache.hadoop.mapred.FileInputFormat.addInputPath(job, new org.apache.hadoop.fs.Path(inputPath));
-
- return result;
- }
-
- /**
- * Creates a data stream from the given {@link org.apache.hadoop.mapred.FileInputFormat}. A {@link
- * org.apache.hadoop.mapred.JobConf} with the given inputPath is created.
- */
- public <K, V> DataStreamSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K, V>
- mapredInputFormat, Class<K> key, Class<V> value, String inputPath) {
- return readHadoopFile(mapredInputFormat, key, value, inputPath, new JobConf());
- }
-
- /**
- * Creates a data stream from the given {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}. A {@link
- * org.apache.hadoop.mapreduce.Job} with the given inputPath is created.
- */
- public <K, V> DataStreamSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapreduce.lib.input
- .FileInputFormat<K, V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath, Job job) throws
- IOException {
- DataStreamSource<Tuple2<K, V>> result = createHadoopInput(mapredInputFormat, key, value, job);
-
- org.apache.hadoop.mapreduce.lib.input.FileInputFormat.addInputPath(job, new org.apache
- .hadoop.fs.Path(inputPath));
-
- return result;
- }
-
- /**
- * Creates a data stream from the given {@link org.apache.hadoop.mapreduce.InputFormat}.
- */
- public <K, V> DataStreamSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapreduce.lib.input
- .FileInputFormat<K, V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath) throws
- IOException {
- return readHadoopFile(mapredInputFormat, key, value, inputPath, Job.getInstance());
- }
-
- /**
* Creates a new data stream that contains the strings received infinitely from a socket. Received strings are
* decoded by the system's default character set. On the termination of the socket server connection retries can be
* initiated.
@@ -908,29 +857,6 @@ public abstract class StreamExecutionEnvironment {
}
/**
- * Creates a data stream from the given {@link org.apache.hadoop.mapred.InputFormat}.
- */
- public <K, V> DataStreamSource<Tuple2<K, V>> createHadoopInput(org.apache.hadoop.mapred.InputFormat<K, V>
- mapredInputFormat, Class<K> key, Class<V> value, JobConf job) {
- HadoopInputFormat<K, V> hadoopInputFormat = new HadoopInputFormat<K, V>(mapredInputFormat, key, value, job);
-
- return createInput(hadoopInputFormat, TypeExtractor.getInputFormatTypes(hadoopInputFormat), "Hadoop " +
- "Input source");
- }
-
- /**
- * Creates a data stream from the given {@link org.apache.hadoop.mapred.InputFormat}.
- */
- public <K, V> DataStreamSource<Tuple2<K, V>> createHadoopInput(org.apache.hadoop.mapreduce.InputFormat<K, V>
- mapredInputFormat, Class<K> key, Class<V> value, Job job) {
- org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> hadoopInputFormat = new org.apache.flink
- .api.java.hadoop.mapreduce.HadoopInputFormat<K, V>(mapredInputFormat, key, value, job);
-
- return createInput(hadoopInputFormat, TypeExtractor.getInputFormatTypes(hadoopInputFormat), "Hadoop Input " +
- "source");
- }
-
- /**
* Generic method to create an input data stream with {@link org.apache.flink.api.common.io.InputFormat}. The data stream will not be immediately
* created - instead, this method returns a data stream that will be lazily created from the input format once the
* program is executed.
http://git-wip-us.apache.org/repos/asf/flink/blob/145b2ba7/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
index cbbd409..c70c1df 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
@@ -109,5 +109,10 @@ public class StreamExecutionEnvironmentTest {
public Object next() {
return null;
}
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/145b2ba7/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 5999625..865b484 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -18,18 +18,20 @@
package org.apache.flink.streaming.api.scala
-import scala.reflect.ClassTag
import com.esotericsoftware.kryo.Serializer
-import org.apache.commons.lang.Validate
-import org.joda.time.Instant
+import org.apache.flink.api.common.io.{FileInputFormat, InputFormat}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
import org.apache.flink.api.scala.ClosureCleaner
+import org.apache.flink.runtime.state.StateHandleProvider
import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaEnv}
import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType
import org.apache.flink.streaming.api.functions.source.{FromElementsFunction, SourceFunction}
+import org.apache.flink.types.StringValue
+import org.apache.flink.util.SplittableIterator
+
+import scala.collection.JavaConverters._
import scala.reflect.ClassTag
-import org.apache.flink.runtime.state.StateHandleProvider
class StreamExecutionEnvironment(javaEnv: JavaEnv) {
@@ -217,6 +219,61 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
javaEnv.readTextFile(filePath)
/**
+ * Creates a data stream that represents the Strings produced by reading the given file
+ * line wise. The character set with the given name will be used to read the files.
+ */
+ def readTextFile(filePath: String, charsetName: String): DataStream[String] =
+ javaEnv.readTextFile(filePath, charsetName)
+
+ /**
+ * Creates a data stream that represents the strings produced by reading the given file
+ * line wise. This method is similar to the standard text file reader, but it produces
+ * a data stream with mutable StringValue objects, rather than Java Strings.
+ * StringValues can be used to tune implementations to be less object and garbage
+ * collection heavy. The file will be read with the system's default character set.
+ */
+ def readTextFileWithValue(filePath: String): DataStream[StringValue] =
+ javaEnv.readTextFileWithValue(filePath)
+
+ /**
+ * Creates a data stream that represents the strings produced by reading the given file
+ * line wise. This method is similar to the standard text file reader, but it produces
+ * a data stream with mutable StringValue objects, rather than Java Strings.
+ * StringValues can be used to tune implementations to be less object and garbage
+ * collection heavy. The boolean flag indicates whether to skip lines that cannot
+ * be read with the given character set.
+ */
+ def readTextFileWithValue(filePath: String, charsetName : String, skipInvalidLines : Boolean):
+ DataStream[StringValue] =
+ javaEnv.readTextFileWithValue(filePath, charsetName, skipInvalidLines)
+
+ /**
+ * Reads the given file with the given input format. The file path should be passed
+ * as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path").
+ */
+ def readFile[T: ClassTag : TypeInformation](inputFormat: FileInputFormat[T], filePath: String):
+ DataStream[T] =
+ javaEnv.readFile(inputFormat, filePath)
+
+ /**
+ * Creates a data stream that represents the primitive type produced by reading the given file
+ * line wise. The file path should be passed as a URI (e.g., "file:///some/local/file" or
+ * "hdfs://host:port/file/path").
+ */
+ def readFileOfPrimitives[T: ClassTag : TypeInformation](filePath: String, typeClass: Class[T]):
+ DataStream[T] =
+ javaEnv.readFileOfPrimitives(filePath, typeClass)
+
+ /**
+ * Creates a data stream that represents the primitive type produced by reading the given file
+ * line wise. The file path should be passed as a URI (e.g., "file:///some/local/file" or
+ * "hdfs://host:port/file/path").
+ */
+ def readFileOfPrimitives[T: ClassTag : TypeInformation](filePath: String, delimiter: String,
+ typeClass: Class[T]): DataStream[T] =
+ javaEnv.readFileOfPrimitives(filePath, delimiter, typeClass)
+
+ /**
* Creates a DataStream that contains the contents of file created while
* system watches the given path. The file will be read with the system's
* default character set. The user can check the monitoring interval in milliseconds,
@@ -231,24 +288,37 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
/**
* Creates a new DataStream that contains the strings received infinitely
* from socket. Received strings are decoded by the system's default
- * character set.
- *
+ * character set. The maximum retry interval is specified in seconds, in case
+ * of temporary service outage reconnection is initiated every second.
*/
- def socketTextStream(hostname: String, port: Int, delimiter: Char): DataStream[String] =
- javaEnv.socketTextStream(hostname, port, delimiter)
+ def socketTextStream(hostname: String, port: Int, delimiter: Char = '\n', maxRetry: Long = 0):
+ DataStream[String] =
+ javaEnv.socketTextStream(hostname, port)
/**
- * Creates a new DataStream that contains the strings received infinitely
- * from socket. Received strings are decoded by the system's default
- * character set, uses '\n' as delimiter.
- *
+ * Generic method to create an input data stream with a specific input format.
+ * Since all data streams need specific information about their types, this method needs to
+ * determine the type of the data produced by the input format. It will attempt to determine the
+ * data type by reflection, unless the input format implements the ResultTypeQueryable interface.
*/
- def socketTextStream(hostname: String, port: Int): DataStream[String] =
- javaEnv.socketTextStream(hostname, port)
+ def createInput[T: ClassTag : TypeInformation](inputFormat: InputFormat[T, _]): DataStream[T] =
+ javaEnv.createInput(inputFormat)
+
+ /**
+ * Generic method to create an input data stream with a specific input format.
+ * Since all data streams need specific information about their types, this method needs to
+ * determine the type of the data produced by the input format. It will attempt to determine the
+ * data type by reflection, unless the input format implements the ResultTypeQueryable interface.
+ */
+ def createInput[T: ClassTag : TypeInformation](inputFormat: InputFormat[T, _],
+ typeInfo: TypeInformation[T]): DataStream[T] =
+ javaEnv.createInput(inputFormat, typeInfo)
/**
* Creates a new DataStream that contains a sequence of numbers.
*
+ * Note that this operation will result in a non-parallel data source, i.e. a data source with
+ * a parallelism of one.
*/
def generateSequence(from: Long, to: Long): DataStream[Long] = {
new DataStream[java.lang.Long](javaEnv.generateSequence(from, to)).
@@ -256,10 +326,18 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
}
/**
+ * Creates a new DataStream that contains a sequence of numbers in a parallel fashion.
+ */
+ def generateParallelSequence(from: Long, to: Long): DataStream[Long] = {
+ new DataStream[java.lang.Long](javaEnv.generateParallelSequence(from, to)).
+ asInstanceOf[DataStream[Long]]
+ }
+
+ /**
* Creates a DataStream that contains the given elements. The elements must all be of the
- * same type and must be serializable.
+ * same type.
*
- * * Note that this operation will result in a non-parallel data source, i.e. a data source with
+ * Note that this operation will result in a non-parallel data source, i.e. a data source with
* a parallelism of one.
*/
def fromElements[T: ClassTag: TypeInformation](data: T*): DataStream[T] = {
@@ -286,6 +364,26 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
}
/**
+ * Creates a DataStream from the given [[Iterator]].
+ *
+ * Note that this operation will result in a non-parallel data source, i.e. a data source with
+ * a parallelism of one.
+ */
+ def fromCollection[T: ClassTag : TypeInformation] (data: Iterator[T]): DataStream[T] = {
+ val typeInfo = implicitly[TypeInformation[T]]
+ javaEnv.fromCollection(data.asJava, typeInfo)
+ }
+
+ /**
+ * Creates a DataStream from the given [[SplittableIterator]].
+ */
+ def fromParallelCollection[T: ClassTag : TypeInformation] (data: SplittableIterator[T]):
+ DataStream[T] = {
+ val typeInfo = implicitly[TypeInformation[T]]
+ javaEnv.fromParallelCollection(data, typeInfo)
+ }
+
+ /**
* Create a DataStream using a user defined source function for arbitrary
* source functionality. By default sources have a parallelism of 1.
* To enable parallel execution, the user defined source should implement
[4/6] flink git commit: [FLINK-1687] [streaming] [api-breaking]
fromCollection and generateSequence rework
Posted by mb...@apache.org.
[FLINK-1687] [streaming] [api-breaking] fromCollection and generateSequence rework
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/120bd0f4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/120bd0f4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/120bd0f4
Branch: refs/heads/master
Commit: 120bd0f428fd4ca4051d052ee9a79728d55e72d7
Parents: 1c3b67e
Author: mbalassi <mb...@apache.org>
Authored: Sat May 23 22:34:06 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Fri May 29 15:08:32 2015 +0200
----------------------------------------------------------------------
.../environment/StreamExecutionEnvironment.java | 28 +++--
.../functions/source/FromIteratorFunction.java | 3 -
.../source/FromSplittableIteratorFunction.java | 52 +++++++++
.../functions/source/GenSequenceFunction.java | 55 ---------
.../api/StreamExecutionEnvironmentTest.java | 113 +++++++++++++++++++
.../api/collector/DirectedOutputTest.java | 2 +-
.../api/complex/ComplexIntegrationTest.java | 6 +-
.../streaming/api/operators/ProjectTest.java | 2 +-
.../api/streamtask/StreamVertexTest.java | 2 +-
9 files changed, 188 insertions(+), 75 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/120bd0f4/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index a1fe60d..09a8788 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -55,7 +55,7 @@ import org.apache.flink.streaming.api.functions.source.FileReadFunction;
import org.apache.flink.streaming.api.functions.source.FileSourceFunction;
import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
import org.apache.flink.streaming.api.functions.source.FromIteratorFunction;
-import org.apache.flink.streaming.api.functions.source.GenSequenceFunction;
+import org.apache.flink.streaming.api.functions.source.FromSplittableIteratorFunction;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -418,7 +418,7 @@ public abstract class StreamExecutionEnvironment {
if (from > to) {
throw new IllegalArgumentException("Start of sequence must not be greater than the end");
}
- return addSource(new GenSequenceFunction(from, to), "Sequence Source");
+ return fromCollection(new NumberSequenceIterator(from, to), BasicTypeInfo.LONG_TYPE_INFO, "Sequence Source");
}
/**
@@ -432,8 +432,8 @@ public abstract class StreamExecutionEnvironment {
* @return A data stream, containing all number in the [from, to] interval
*/
public DataStreamSource<Long> generateParallelSequence(long from, long to) {
- return fromParallelCollection(new NumberSequenceIterator(from, to), BasicTypeInfo.LONG_TYPE_INFO, "Parellel " +
- "Sequence source");
+ return fromParallelCollection(new NumberSequenceIterator(from, to), BasicTypeInfo.LONG_TYPE_INFO, "Parallel " +
+ "Sequence Source");
}
/**
@@ -456,7 +456,7 @@ public abstract class StreamExecutionEnvironment {
* The type of the returned data stream
* @return The data stream representing the given array of elements
*/
- public <OUT extends Serializable> DataStreamSource<OUT> fromElements(OUT... data) {
+ public <OUT> DataStreamSource<OUT> fromElements(OUT... data) {
if (data.length == 0) {
throw new IllegalArgumentException(
"fromElements needs at least one element as argument");
@@ -489,7 +489,7 @@ public abstract class StreamExecutionEnvironment {
* The type of the returned data stream
* @return The data stream representing the given collection
*/
- public <OUT extends Serializable> DataStreamSource<OUT> fromCollection(Collection<OUT> data) {
+ public <OUT> DataStreamSource<OUT> fromCollection(Collection<OUT> data) {
Preconditions.checkNotNull(data, "Collection must not be null");
if (data.isEmpty()) {
throw new IllegalArgumentException("Collection must not be empty");
@@ -518,7 +518,7 @@ public abstract class StreamExecutionEnvironment {
* The type of the returned data stream
* @return The data stream representing the given collection
*/
- public <OUT extends Serializable> DataStreamSource<OUT> fromCollection(Collection<OUT> data, TypeInformation<OUT>
+ public <OUT> DataStreamSource<OUT> fromCollection(Collection<OUT> data, TypeInformation<OUT>
typeInfo) {
Preconditions.checkNotNull(data, "Collection must not be null");
if (data.isEmpty()) {
@@ -552,7 +552,7 @@ public abstract class StreamExecutionEnvironment {
* @return The data stream representing the elements in the iterator
* @see #fromCollection(java.util.Iterator, org.apache.flink.api.common.typeinfo.TypeInformation)
*/
- public <OUT extends Serializable> DataStreamSource<OUT> fromCollection(Iterator<OUT> data, Class<OUT> type) {
+ public <OUT> DataStreamSource<OUT> fromCollection(Iterator<OUT> data, Class<OUT> type) {
return fromCollection(data, TypeExtractor.getForClass(type));
}
@@ -578,7 +578,7 @@ public abstract class StreamExecutionEnvironment {
* The type of the returned data stream
* @return The data stream representing the elements in the iterator
*/
- public <OUT extends Serializable> DataStreamSource<OUT> fromCollection(Iterator<OUT> data, TypeInformation<OUT>
+ public <OUT> DataStreamSource<OUT> fromCollection(Iterator<OUT> data, TypeInformation<OUT>
typeInfo) {
Preconditions.checkNotNull(data, "The iterator must not be null");
if (!(data instanceof Serializable)) {
@@ -589,6 +589,12 @@ public abstract class StreamExecutionEnvironment {
return addSource(function, "Collection Source").returns(typeInfo);
}
+ // private helper for passing different names
+ private <OUT> DataStreamSource<OUT> fromCollection(Iterator<OUT> iterator, TypeInformation<OUT>
+ typeInfo, String operatorName) {
+ return addSource(new FromIteratorFunction<OUT>(iterator), operatorName).returns(typeInfo);
+ }
+
/**
* Creates a new data stream that contains elements in the iterator. The iterator is splittable, allowing the
* framework to create a parallel data stream source that returns the elements in the iterator. The iterator
@@ -636,13 +642,13 @@ public abstract class StreamExecutionEnvironment {
*/
public <OUT> DataStreamSource<OUT> fromParallelCollection(SplittableIterator<OUT> iterator, TypeInformation<OUT>
typeInfo) {
- return fromParallelCollection(iterator, typeInfo, "Parallel Collection source");
+ return fromParallelCollection(iterator, typeInfo, "Parallel Collection Source");
}
// private helper for passing different names
private <OUT> DataStreamSource<OUT> fromParallelCollection(SplittableIterator<OUT> iterator, TypeInformation<OUT>
typeInfo, String operatorName) {
- return addSource(new FromIteratorFunction<OUT>(iterator), operatorName).returns(typeInfo);
+ return addSource(new FromSplittableIteratorFunction<OUT>(iterator), operatorName).returns(typeInfo);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/120bd0f4/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java
index d46b1f2..125b88b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java
@@ -17,9 +17,6 @@
package org.apache.flink.streaming.api.functions.source;
-import org.apache.flink.util.Collector;
-
-import java.io.Serializable;
import java.util.Iterator;
public class FromIteratorFunction<T> implements SourceFunction<T> {
http://git-wip-us.apache.org/repos/asf/flink/blob/120bd0f4/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java
new file mode 100644
index 0000000..fd86858
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.SplittableIterator;
+
+import java.util.Iterator;
+
+public class FromSplittableIteratorFunction<T> extends RichParallelSourceFunction<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ SplittableIterator<T> fullIterator;
+ Iterator<T> iterator;
+
+ public FromSplittableIteratorFunction(SplittableIterator<T> iterator) {
+ this.fullIterator = iterator;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ int numberOfSubTasks = getRuntimeContext().getNumberOfParallelSubtasks();
+ int indexofThisSubTask = getRuntimeContext().getIndexOfThisSubtask();
+ iterator = fullIterator.split(numberOfSubTasks)[indexofThisSubTask];
+ }
+
+ @Override
+ public boolean reachedEnd() throws Exception {
+ return !iterator.hasNext();
+ }
+
+ @Override
+ public T next() throws Exception {
+ return iterator.next();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/120bd0f4/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/GenSequenceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/GenSequenceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/GenSequenceFunction.java
deleted file mode 100644
index 7d302d2..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/GenSequenceFunction.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.source;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.NumberSequenceIterator;
-
-/**
- * Source Function used to generate the number sequence
- *
- */
-public class GenSequenceFunction extends RichParallelSourceFunction<Long> {
-
- private static final long serialVersionUID = 1L;
-
- private NumberSequenceIterator fullIterator;
- private NumberSequenceIterator splitIterator;
-
- public GenSequenceFunction(long from, long to) {
- fullIterator = new NumberSequenceIterator(from, to);
- }
-
- @Override
- public void open(Configuration config) {
- int splitNumber = getRuntimeContext().getIndexOfThisSubtask();
- int numOfSubTasks = getRuntimeContext().getNumberOfParallelSubtasks();
- splitIterator = fullIterator.split(numOfSubTasks)[splitNumber];
- }
-
- @Override
- public boolean reachedEnd() throws Exception {
- return !splitIterator.hasNext();
- }
-
- @Override
- public Long next() throws Exception {
- return splitIterator.next();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/120bd0f4/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
new file mode 100644
index 0000000..cbbd409
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
+import org.apache.flink.streaming.api.windowing.helper.Count;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.SplittableIterator;
+import org.junit.Test;
+
+import java.util.Iterator;
+
+import static org.junit.Assert.assertTrue;
+
+public class StreamExecutionEnvironmentTest {
+
+ private static final long MEMORYSIZE = 32;
+ private static int PARALLELISM = 4;
+
+ @Test
+ public void testFromCollectionParallelism() {
+ TypeInformation<Object> typeInfo = TypeExtractor.getForClass(Object.class);
+ StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
+ boolean seenExpectedException = false;
+
+ try {
+ DataStream<Object> dataStream1 = env.fromCollection(new DummySplittableIterator(), typeInfo)
+ .setParallelism(4);
+ } catch (IllegalArgumentException e) {
+ seenExpectedException = true;
+ }
+
+ DataStream<Object> dataStream2 = env.fromParallelCollection(new DummySplittableIterator(), typeInfo)
+ .setParallelism(4);
+
+ String plan = env.getExecutionPlan();
+
+ assertTrue("Expected Exception for setting parallelism was not thrown.", seenExpectedException);
+ assertTrue("Parallelism for dataStream1 is not right.",
+ plan.contains("\"contents\":\"Collection Source\",\"parallelism\":1"));
+ assertTrue("Parallelism for dataStream2 is not right.",
+ plan.contains("\"contents\":\"Parallel Collection Source\",\"parallelism\":4"));
+ }
+
+ @Test
+ public void testGenerateSequenceParallelism() throws Exception {
+ StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
+ boolean seenExpectedException = false;
+
+ try {
+ DataStream<Long> dataStream1 = env.generateSequence(0,0).setParallelism(4);
+ } catch (IllegalArgumentException e) {
+ seenExpectedException = true;
+ }
+
+ DataStream<Long> dataStream2 = env.generateParallelSequence(0,0).setParallelism(4);
+
+ String plan = env.getExecutionPlan();
+
+ assertTrue("Expected Exception for setting parallelism was not thrown.", seenExpectedException);
+ assertTrue("Parallelism for dataStream1 is not right.",
+ plan.contains("\"contents\":\"Sequence Source\",\"parallelism\":1"));
+ assertTrue("Parallelism for dataStream2 is not right.",
+ plan.contains("\"contents\":\"Parallel Sequence Source\",\"parallelism\":4"));
+ }
+
+ public static class DummySplittableIterator extends SplittableIterator {
+ private static final long serialVersionUID = 1312752876092210499L;
+
+ @Override
+ public Iterator[] split(int numPartitions) {
+ return new Iterator[0];
+ }
+
+ @Override
+ public int getMaximumNumberOfSplits() {
+ return 0;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return false;
+ }
+
+ @Override
+ public Object next() {
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/120bd0f4/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
index fc3e36f..56b6ae8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
@@ -101,7 +101,7 @@ public class DirectedOutputTest {
TestListResultSink<Long> evenAndOddSink = new TestListResultSink<Long>();
TestListResultSink<Long> allSink = new TestListResultSink<Long>();
- SplitDataStream<Long> source = env.generateSequence(1, 11).split(new MyOutputSelector());
+ SplitDataStream<Long> source = env.generateParallelSequence(1, 11).split(new MyOutputSelector());
source.select(EVEN).addSink(evenSink);
source.select(ODD, TEN).addSink(oddAndTenSink);
source.select(EVEN, ODD).addSink(evenAndOddSink);
http://git-wip-us.apache.org/repos/asf/flink/blob/120bd0f4/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
index 908671d..d321851 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
@@ -222,8 +222,8 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
env.setBufferTimeout(0);
- DataStream<Long> sourceStream31 = env.generateSequence(1, 10000);
- DataStream<Long> sourceStream32 = env.generateSequence(10001, 20000);
+ DataStream<Long> sourceStream31 = env.generateParallelSequence(1, 10000);
+ DataStream<Long> sourceStream32 = env.generateParallelSequence(10001, 20000);
sourceStream31.filter(new PrimeFilterFunction())
.window(Count.of(100))
@@ -308,7 +308,7 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
env.setBufferTimeout(0);
- DataStream<Long> dataStream51 = env.generateSequence(1, 5)
+ DataStream<Long> dataStream51 = env.generateParallelSequence(1, 5)
.map(new MapFunction<Long, Long>() {
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/120bd0f4/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java
index d9cc607..c09afee 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java
@@ -92,7 +92,7 @@ public class ProjectTest implements Serializable {
StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORY_SIZE);
- env.generateSequence(1, 10).map(new MapFunction<Long, Tuple3<Long, Character, Double>>() {
+ env.generateParallelSequence(1, 10).map(new MapFunction<Long, Tuple3<Long, Character, Double>>() {
@Override
public Tuple3<Long, Character, Double> map(Long value) throws Exception {
return new Tuple3<Long, Character, Double>(value, 'c', value.doubleValue());
http://git-wip-us.apache.org/repos/asf/flink/blob/120bd0f4/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java
index 0bb1848..90cb7b3 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java
@@ -157,7 +157,7 @@ public class StreamVertexTest {
StreamExecutionEnvironment env = new TestStreamEnvironment(SOURCE_PARALELISM, MEMORYSIZE);
DataStream<String> fromStringElements = env.fromElements("aa", "bb", "cc");
- DataStream<Long> generatedSequence = env.generateSequence(0, 3);
+ DataStream<Long> generatedSequence = env.generateParallelSequence(0, 3);
fromStringElements.connect(generatedSequence).map(new CoMap()).addSink(new SetSink());
[6/6] flink git commit: [streaming] Build warnings eliminated from
streaming-core
Posted by mb...@apache.org.
[streaming] Build warnings eliminated from streaming-core
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0930179f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0930179f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0930179f
Branch: refs/heads/master
Commit: 0930179f41dec179cb60882f699ee4ce8ba34d61
Parents: 0fd6023
Author: mbalassi <mb...@apache.org>
Authored: Thu May 28 16:32:37 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Fri May 29 15:08:33 2015 +0200
----------------------------------------------------------------------
.../api/datastream/ConnectedDataStream.java | 15 ++------
.../streaming/api/datastream/DataStream.java | 37 ++++++++++----------
.../api/datastream/DiscretizedStream.java | 7 ++--
.../api/datastream/GroupedDataStream.java | 11 ++----
.../api/datastream/IterativeDataStream.java | 6 ++--
.../api/datastream/WindowedDataStream.java | 4 +--
.../datastream/temporal/TemporalOperator.java | 4 +--
.../environment/StreamExecutionEnvironment.java | 35 ++++++++----------
.../functions/source/FileSourceFunction.java | 1 +
.../api/operators/windowing/WindowMerger.java | 1 +
.../ExtractionAwareDeltaFunction.java | 6 ++--
.../streaming/api/windowing/helper/Delta.java | 1 +
.../streaming/api/windowing/helper/Time.java | 2 +-
.../policy/CloneableMultiEvictionPolicy.java | 4 +--
.../windowing/policy/CountEvictionPolicy.java | 2 +-
.../api/windowing/policy/DeltaPolicy.java | 1 +
.../api/windowing/policy/TimeTriggerPolicy.java | 14 +++-----
.../streaming/runtime/tasks/OutputHandler.java | 2 +-
.../api/StreamExecutionEnvironmentTest.java | 1 +
.../SlidingTimeGroupedPreReducerTest.java | 1 +
.../windowbuffer/SlidingTimePreReducerTest.java | 1 +
21 files changed, 69 insertions(+), 87 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0930179f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
index 7362802..02db538 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
@@ -17,7 +17,6 @@
package org.apache.flink.streaming.api.datastream;
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.Utils;
@@ -28,8 +27,6 @@ import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.co.CoReduceFunction;
import org.apache.flink.streaming.api.functions.co.CoWindowFunction;
-import org.apache.flink.streaming.api.functions.co.RichCoMapFunction;
-import org.apache.flink.streaming.api.functions.co.RichCoReduceFunction;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.operators.co.CoStreamFlatMap;
@@ -232,9 +229,7 @@ public class ConnectedDataStream<IN1, IN2> {
* the output to a common type. The transformation calls a
* {@link CoMapFunction#map1} for each element of the first input and
* {@link CoMapFunction#map2} for each element of the second input. Each
- * CoMapFunction call returns exactly one element. The user can also extend
- * {@link RichCoMapFunction} to gain access to other features provided by
- * the {@link RichFuntion} interface.
+ * CoMapFunction call returns exactly one element.
*
* @param coMapper
* The CoMapFunction used to jointly transform the two input
@@ -258,9 +253,7 @@ public class ConnectedDataStream<IN1, IN2> {
* {@link CoFlatMapFunction#flatMap1} for each element of the first input
* and {@link CoFlatMapFunction#flatMap2} for each element of the second
* input. Each CoFlatMapFunction call returns any number of elements
- * including none. The user can also extend {@link RichFlatMapFunction} to
- * gain access to other features provided by the {@link RichFuntion}
- * interface.
+ * including none.
*
* @param coFlatMapper
* The CoFlatMapFunction used to jointly transform the two input
@@ -285,9 +278,7 @@ public class ConnectedDataStream<IN1, IN2> {
* sliding batch/window of the data stream. If the connected data stream is
* grouped then the reducer is applied on every group of elements sharing
* the same key. This type of reduce is much faster than reduceGroup since
- * the reduce function can be applied incrementally. The user can also
- * extend the {@link RichCoReduceFunction} to gain access to other features
- * provided by the {@link RichFuntion} interface.
+ * the reduce function can be applied incrementally.
*
* @param coReducer
* The {@link CoReduceFunction} that will be called for every
http://git-wip-us.apache.org/repos/asf/flink/blob/0930179f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 5165ec7..7edbec5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -378,7 +378,7 @@ public class DataStream<OUT> {
* instances of the next processing operator.
*
* @param keySelector
- * @return
+ * @return The partitioned DataStream
*/
protected DataStream<OUT> partitionBy(KeySelector<OUT, ?> keySelector) {
return setConnectionType(new FieldsPartitioner<OUT>(clean(keySelector)));
@@ -607,17 +607,13 @@ public class DataStream<OUT> {
* Initiates a Project transformation on a {@link Tuple} {@link DataStream}.<br/>
* <b>Note: Only Tuple DataStreams can be projected.</b></br> The
* transformation projects each Tuple of the DataSet onto a (sub)set of
- * fields.</br> This method returns a {@link StreamProjection} on which
- * {@link StreamProjection#types(Class)} needs to be called to completed the
- * transformation.
+ * fields.
*
* @param fieldIndexes
* The field indexes of the input tuples that are retained. The
* order of fields in the output tuple corresponds to the order
* of field indexes.
- * @return A StreamProjection that needs to be converted into a DataStream
- * to complete the project transformation by calling
- * {@link StreamProjection#types(Class)}.
+ * @return The projected DataStream
*
* @see Tuple
* @see DataStream
@@ -639,8 +635,8 @@ public class DataStream<OUT> {
* {@link StreamCrossOperator#onWindow} should be called to define the
* window.
* <p>
- * Call {@link StreamCrossOperator.CrossWindow#with(crossFunction)} to
- * define a custom cross function.
+ * Call {@link StreamCrossOperator.CrossWindow#with(org.apache.flink.api.common.functions.CrossFunction)}
+ * to define a custom cross function.
*
* @param dataStreamToCross
* The other DataStream with which this DataStream is crossed.
@@ -658,14 +654,17 @@ public class DataStream<OUT> {
* {@link DataStream}s on key equality over a specified time window.</br>
*
* This method returns a {@link StreamJoinOperator} on which the
- * {@link StreamJoinOperator#onWindow} should be called to define the
- * window, and then the {@link StreamJoinOperator.JoinWindow#where} and
- * {@link StreamJoinOperator.JoinPredicate#equalTo} can be used to define
- * the join keys.</p> The user can also use the
- * {@link StreamJoinOperator.JoinedStream#with(joinFunction)} to apply
- * custom join function.
- *
- * @param other
+ * {@link StreamJoinOperator#onWindow(long, java.util.concurrent.TimeUnit)}
+ * should be called to define the window, and then the
+ * {@link StreamJoinOperator.JoinWindow#where(int...)} and
+ * {@link StreamJoinOperator.JoinPredicate#equalTo(int...)} can be used to define
+ * the join keys.
+ * <p>
+ * The user can also use the
+ * {@link StreamJoinOperator.JoinedStream#with(org.apache.flink.api.common.functions.JoinFunction)}
+ * to apply a custom join function.
+ *
+ * @param dataStreamToJoin
* The other DataStream with which this DataStream is joined.
* @return A {@link StreamJoinOperator} to continue the definition of the
* Join transformation.
@@ -923,11 +922,11 @@ public class DataStream<OUT> {
* to a grouped data stream, the windows (evictions) and slide sizes
* (triggers) will be computed on a per group basis. </br></br> For more
* advanced control over the trigger and eviction policies please refer to
- * {@link #window(trigger, eviction)} </br> </br> For example to create a
+ * {@link #window(TriggerPolicy, EvictionPolicy)} </br> </br> For example to create a
* sum every 5 seconds in a tumbling fashion:</br>
* {@code ds.window(Time.of(5, TimeUnit.SECONDS)).sum(field)} </br></br> To
* create sliding windows use the
- * {@link WindowedDataStream#every(WindowingHelper...)} </br></br> The same
+ * {@link WindowedDataStream#every(WindowingHelper)} </br></br> The same
* example with 3 second slides:</br>
*
* {@code ds.window(Time.of(5, TimeUnit.SECONDS)).every(Time.of(3,
http://git-wip-us.apache.org/repos/asf/flink/blob/0930179f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
index d70c4b8..4083bb8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
@@ -53,9 +53,8 @@ import org.apache.flink.streaming.api.windowing.WindowUtils.WindowTransformation
/**
* A {@link DiscretizedStream} represents a data stream that has been divided
* into windows (predefined chunks). User defined function such as
- * {@link #reduceWindow(ReduceFunction)}, {@link #mapWindow()},
- * {@link #foldWindow(FoldFunction, initialValue)} or aggregations can be
- * applied to the windows.
+ * {@link #reduceWindow(ReduceFunction)}, {@link #mapWindow(WindowMapFunction)},
+ * or aggregations can be applied to the windows.
*
* @param <OUT>
* The output type of the {@link DiscretizedStream}
@@ -127,7 +126,7 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> {
*
* @param reduceFunction
* The reduce function to be applied on the windows
- * @return
+ * @return The reduced DataStream
*/
protected DiscretizedStream<OUT> timeReduce(ReduceFunction<OUT> reduceFunction) {
http://git-wip-us.apache.org/repos/asf/flink/blob/0930179f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
index aed02dc..62e7781 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
@@ -19,7 +19,6 @@ package org.apache.flink.streaming.api.datastream;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.functions.KeySelector;
@@ -32,7 +31,7 @@ import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
/**
* A GroupedDataStream represents a {@link DataStream} which has been
* partitioned by the given {@link KeySelector}. Operators like {@link #reduce},
- * {@link #batchReduce} etc. can be applied on the {@link GroupedDataStream} to
+ * {@link #fold} etc. can be applied on the {@link GroupedDataStream} to
* get additional functionality by the grouping.
*
* @param <OUT>
@@ -67,9 +66,7 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> {
* Applies a reduce transformation on the grouped data stream grouped on by
* the given key position. The {@link ReduceFunction} will receive input
* values based on the key value. Only input values with the same key will
- * go to the same reducer.The user can also extend
- * {@link RichReduceFunction} to gain access to other features provided by
- * the {@link RichFuntion} interface.
+ * go to the same reducer.
*
* @param reducer
* The {@link ReduceFunction} that will be called for every
@@ -86,9 +83,7 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> {
* Applies a fold transformation on the grouped data stream grouped on by
* the given key position. The {@link FoldFunction} will receive input
* values based on the key value. Only input values with the same key will
- * go to the same folder.The user can also extend {@link RichFoldFunction}
- * to gain access to other features provided by the {@link RichFuntion}
- * interface.
+ * go to the same folder.
*
* @param folder
* The {@link FoldFunction} that will be called for every element
http://git-wip-us.apache.org/repos/asf/flink/blob/0930179f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
index 178f2eb..581087d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
@@ -42,11 +42,11 @@ public class IterativeDataStream<IN> extends
* program part that will be fed back to the start of the iteration. </br>
* </br>A common usage pattern for streaming iterations is to use output
* splitting to send a part of the closing data stream to the head. Refer to
- * {@link SingleOutputStreamOperator#split(OutputSelector)} for more
- * information.
+ * {@link DataStream#split(org.apache.flink.streaming.api.collector.selector.OutputSelector)}
+ * for more information.
*
*
- * @param iterationResult
+ * @param iterationTail
* The data stream that is fed back to the next iteration head.
* @return Returns the stream that was fed back to the iteration. In most
* cases no further transformation are applied on this stream.
http://git-wip-us.apache.org/repos/asf/flink/blob/0930179f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index a10c79e..5d769ca 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -76,7 +76,7 @@ import org.apache.flink.streaming.util.keys.KeySelectorUtil;
/**
* A {@link WindowedDataStream} represents a data stream that has been
* discretised into windows. User defined function such as
- * {@link #reduceWindow(ReduceFunction)}, {@link #mapWindow()} or aggregations
+ * {@link #reduceWindow(ReduceFunction)}, {@link #mapWindow(WindowMapFunction)} or aggregations
* can be applied to the windows. The results of these transformations are also
* WindowedDataStreams of the same discretisation unit.
*
@@ -610,7 +610,7 @@ public class WindowedDataStream<OUT> {
* stream's underlying type. A dot can be used to drill down into objects,
* as in {@code "field1.getInnerField2()" }.
*
- * @param positionToSum
+ * @param field
* The field to sum
* @return The transformed DataStream.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/0930179f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/TemporalOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/TemporalOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/TemporalOperator.java
index 75332d1..3fe7eb7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/TemporalOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/TemporalOperator.java
@@ -69,7 +69,7 @@ public abstract class TemporalOperator<I1, I2, OP extends TemporalWindow<OP>> {
* transformed.To define sliding windows call {@link TemporalWindow#every}
* on the resulting operator.
*
- * @param windowSize
+ * @param length
* The size of the window in milliseconds.
* @param timeStamp1
* The timestamp used to extract time from the elements of the
@@ -89,7 +89,7 @@ public abstract class TemporalOperator<I1, I2, OP extends TemporalWindow<OP>> {
* transformed.To define sliding windows call {@link TemporalWindow#every}
* on the resulting operator.
*
- * @param windowSize
+ * @param length
* The size of the window in milliseconds.
* @param timeStamp1
* The timestamp used to extract time from the elements of the
http://git-wip-us.apache.org/repos/asf/flink/blob/0930179f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 651da63..da1ba73 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -251,7 +251,7 @@ public abstract class StreamExecutionEnvironment {
* Sets the {@link StateHandleProvider} used for storing operator state
* checkpoints when checkpointing is enabled.
* <p>
- * An example would be using a {@link FileStateHandle#createProvider(Path)}
+ * An example would be using a {@link FileStateHandle#createProvider(String)}
* to use any Flink supported file system as a state backend
*
*/
@@ -857,16 +857,14 @@ public abstract class StreamExecutionEnvironment {
}
/**
- * Generic method to create an input data stream with {@link org.apache.flink.api.common.io.InputFormat}. The data stream will not be immediately
- * created - instead, this method returns a data stream that will be lazily created from the input format once the
- * program is executed.
+ * Generic method to create an input data stream with {@link org.apache.flink.api.common.io.InputFormat}.
* <p/>
* Since all data streams need specific information about their types, this method needs to determine the type of
* the data produced by the input format. It will attempt to determine the data type by reflection, unless the
* input
- * format implements the {@link org.apache.flink.api.java.typeutils .ResultTypeQueryable} interface. In the latter
- * case, this method will invoke the {@link org.apache.flink.api.java.typeutils
- * .ResultTypeQueryable#getProducedType()} method to determine data type produced by the input format.
+ * format implements the {@link org.apache.flink.api.java.typeutils.ResultTypeQueryable} interface. In the latter
+ * case, this method will invoke the {@link org.apache.flink.api.java.typeutils.ResultTypeQueryable#getProducedType()}
+ * method to determine data type produced by the input format.
*
* @param inputFormat
* The input format used to create the data stream
@@ -879,14 +877,12 @@ public abstract class StreamExecutionEnvironment {
}
/**
- * Generic method to create an input data stream with {@link org.apache.flink.api.common.io.InputFormat}. The data stream will not be immediately
- * created - instead, this method returns a data stream that will be lazily created from the input format once the
- * program is executed.
- * <p/>
+ * Generic method to create an input data stream with {@link org.apache.flink.api.common.io.InputFormat}.
+ * <p>
* The data stream is typed to the given TypeInformation. This method is intended for input formats where the
* return
- * type cannot be determined by reflection analysis, and that do not implement the {@link
- * org.apache.flink.api.java.typeutils.ResultTypeQueryable} interface.
+ * type cannot be determined by reflection analysis, and that do not implement the
+ * {@link org.apache.flink.api.java.typeutils.ResultTypeQueryable} interface.
*
* @param inputFormat
* The input format used to create the data stream
@@ -908,14 +904,13 @@ public abstract class StreamExecutionEnvironment {
}
/**
- * Ads a data source with a custom type information thus opening a {@link org.apache.flink.streaming.api
- * .datastream.DataStream}. Only in very special cases does the user need to support type information. Otherwise
- * use
- * {@link #addSource(org.apache.flink.streaming.api.function.source.SourceFunction)}
- * <p/>
+ * Adds a data source with a custom type information thus opening a
+ * {@link org.apache.flink.streaming.api.datastream.DataStream}. Only in very special cases does the user need
+ * to support type information. Otherwise use {@link #addSource(org.apache.flink.streaming.api.functions.source.SourceFunction)}
+ * <p>
* By default sources have a parallelism of 1. To enable parallel execution, the user defined source should
- * implement {@link org.apache.flink.streaming.api.function.source.ParallelSourceFunction} or extend {@link
- * org.apache.flink.streaming.api.function.source.RichParallelSourceFunction}. In these cases the resulting source
+ * implement {@link org.apache.flink.streaming.api.functions.source.ParallelSourceFunction} or extend {@link
+ * org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction}. In these cases the resulting source
* will have the parallelism of the environment. To change this afterwards call {@link
* org.apache.flink.streaming.api.datastream.DataStreamSource#setParallelism(int)}
*
http://git-wip-us.apache.org/repos/asf/flink/blob/0930179f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
index af00d4d..c6f4421 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
@@ -40,6 +40,7 @@ public class FileSourceFunction<OUT> extends RichParallelSourceFunction<OUT> {
private Iterator<InputSplit> splitIterator;
private transient OUT nextElement;
+ @SuppressWarnings("unchecked")
public FileSourceFunction(InputFormat<OUT, ?> format, TypeInformation<OUT> typeInfo) {
this.format = (InputFormat<OUT, InputSplit>) format;
this.typeInfo = typeInfo;
http://git-wip-us.apache.org/repos/asf/flink/blob/0930179f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java
index 9b33474..bb8cfaa 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java
@@ -43,6 +43,7 @@ public class WindowMerger<T> extends AbstractStreamOperator<StreamWindow<T>>
}
@Override
+ @SuppressWarnings("unchecked")
public void processElement(StreamWindow<T> nextWindow) throws Exception {
StreamWindow<T> current = windows.get(nextWindow.windowID);
http://git-wip-us.apache.org/repos/asf/flink/blob/0930179f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/ExtractionAwareDeltaFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/ExtractionAwareDeltaFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/ExtractionAwareDeltaFunction.java
index 81d01a4..3e9f2ca 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/ExtractionAwareDeltaFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/ExtractionAwareDeltaFunction.java
@@ -47,10 +47,10 @@ public abstract class ExtractionAwareDeltaFunction<DATA, TO> implements DeltaFun
/**
* This method takes the two data point and runs the set extractor on it.
- * The delta function implemented at {@link getNestedDelta} is then called
+ * The delta function implemented at {@link #getNestedDelta} is then called
* with the extracted data. In case no extractor is set the input data gets
- * passes to {@link getNestedDelta} as-is. The return value is just
- * forwarded from {@link getNestedDelta}.
+ * passes to {@link #getNestedDelta} as-is. The return value is just
+ * forwarded from {@link #getNestedDelta}.
*
* @param oldDataPoint
* the older data point as raw data (before extraction).
http://git-wip-us.apache.org/repos/asf/flink/blob/0930179f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java
index bcb548f..255049d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java
@@ -94,6 +94,7 @@ public class Delta<DATA> extends WindowingHelper<DATA> {
return new Delta<DATA>(deltaFunction, initVal, threshold);
}
+ @SuppressWarnings("unchecked")
private void instantiateTypeSerializer(){
if (executionConfig == null){
throw new UnsupportedOperationException("ExecutionConfig has to be set to instantiate TypeSerializer.");
http://git-wip-us.apache.org/repos/asf/flink/blob/0930179f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
index 0089d26..022f975 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
@@ -27,7 +27,7 @@ import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
/**
* This helper represents a time based count or eviction policy. By default the
* time is measured with {@link System#currentTimeMillis()} in
- * {@link DefaultTimeStamp}.
+ * {@link SystemTimestamp}.
*
* @param <DATA>
* The data type which is handled by the time stamp used in the
http://git-wip-us.apache.org/repos/asf/flink/blob/0930179f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableMultiEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableMultiEvictionPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableMultiEvictionPolicy.java
index d86b174..5adddc4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableMultiEvictionPolicy.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableMultiEvictionPolicy.java
@@ -44,7 +44,7 @@ public class CloneableMultiEvictionPolicy<DATA> extends MultiEvictionPolicy<DATA
*
* When using this constructor the MAX strategy is used by default. You can
* select other strategies using
- * {@link CloneableMultiEvictionPolicy#CloneableMultiEvictionPolicy(org.apache.flink.streaming.api.windowing.policy.MultiEvictionPolicy.EvictionStrategy, CloneableEvictionPolicy...)}
+ * {@link CloneableMultiEvictionPolicy#CloneableMultiEvictionPolicy(EvictionStrategy, CloneableEvictionPolicy...)}
* .
*
* @param evictionPolicies
@@ -60,7 +60,7 @@ public class CloneableMultiEvictionPolicy<DATA> extends MultiEvictionPolicy<DATA
* constructor.
*
* @param strategy
- * the strategy to be used. See {@link EvictionStrategy} for a
+ * the strategy to be used. See {@link MultiEvictionPolicy.EvictionStrategy} for a
* list of possible options.
* @param evictionPolicies
* some cloneable policies to be tied together.
http://git-wip-us.apache.org/repos/asf/flink/blob/0930179f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java
index 5e0c862..9be25d3 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java
@@ -89,7 +89,7 @@ public class CountEvictionPolicy<IN> implements CloneableEvictionPolicy<IN> {
* will be adjusted respectively but never below zero.
* @param startValue
* A custom start value for the counter of arriving elements.
- * @see CountEvictionPolicy#NextGenCountEvictionPolicy(int, int)
+ * @see CountEvictionPolicy#CountEvictionPolicy(int, int)
*/
public CountEvictionPolicy(int maxElements, int deleteOnEviction, int startValue) {
this.counter = startValue;
http://git-wip-us.apache.org/repos/asf/flink/blob/0930179f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java
index 69dd66f..0583176 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java
@@ -86,6 +86,7 @@ public class DeltaPolicy<DATA> implements CloneableTriggerPolicy<DATA>,
* TypeSerializer to properly forward the initial value to
* the cluster
*/
+ @SuppressWarnings("unchecked")
public DeltaPolicy(DeltaFunction<DATA> deltaFuntion, DATA init, double threshold, TypeSerializer typeSerializer) {
this.deltaFuntion = deltaFuntion;
this.triggerDataPoint = init;
http://git-wip-us.apache.org/repos/asf/flink/blob/0930179f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
index ef701cf..03984a9 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.windowing.policy;
import java.util.LinkedList;
+import org.apache.flink.streaming.api.windowing.helper.SystemTimestamp;
import org.apache.flink.streaming.api.windowing.helper.Timestamp;
import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
@@ -46,7 +47,7 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
/**
* This is mostly the same as
- * {@link TimeTriggerPolicy#TimeTriggerPolicy(long, Timestamp)}. In addition
+ * {@link TimeTriggerPolicy#TimeTriggerPolicy(long, TimestampWrapper)}. In addition
* to granularity and timestamp a delay can be specified for the first
* trigger. If the start time given by the timestamp is x, the delay is y,
* and the granularity is z, the first trigger will happen at x+y+z.
@@ -57,11 +58,6 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
* @param timestampWrapper
* The {@link TimestampWrapper} to measure the time with. This
* can be either user defined of provided by the API.
- * @param timeWrapper
- * This policy creates fake elements to not miss windows in case
- * no element arrived within the duration of the window. This
- * extractor should wrap a long into such an element of type
- * DATA.
*/
public TimeTriggerPolicy(long granularity, TimestampWrapper<DATA> timestampWrapper) {
this.startTime = timestampWrapper.getStartTime();
@@ -86,9 +82,9 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
}
/**
- * In case {@link DefaultTimeStamp} is used, a runnable is returned which
+ * In case {@link SystemTimestamp} is used, a runnable is returned which
* triggers based on the current system time. If any other time measure is
- * used the method return null.
+ * used the method returns null.
*
* @param callback
* The object which is takes the callbacks for adding fake
@@ -107,7 +103,7 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
/**
* This method is only called in case the runnable triggers a window end
- * according to the {@link DefaultTimeStamp}.
+ * according to the {@link SystemTimestamp}.
*
* @param callback
* The callback object.
http://git-wip-us.apache.org/repos/asf/flink/blob/0930179f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
index 38f1231..20aeb89 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
@@ -226,8 +226,8 @@ public class OutputHandler<OUT> {
}
@Override
+ @SuppressWarnings("unchecked")
public void collect(T record) {
-
try {
operator.processElement(record);
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/0930179f/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
index c70c1df..5ca4883 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
@@ -41,6 +41,7 @@ public class StreamExecutionEnvironmentTest {
private static int PARALLELISM = 4;
@Test
+ @SuppressWarnings("unchecked")
public void testFromCollectionParallelism() {
TypeInformation<Object> typeInfo = TypeExtractor.getForClass(Object.class);
StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
http://git-wip-us.apache.org/repos/asf/flink/blob/0930179f/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java
index 18a4748..3f1cba1 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java
@@ -52,6 +52,7 @@ public class SlidingTimeGroupedPreReducerTest {
KeySelector<Tuple2<Integer, Integer>, ?> tupleKey = new TupleModKey(2);
@Test
+ @SuppressWarnings("unchecked")
public void testPreReduce1() throws Exception {
// This ensures that the buffer is properly cleared after a burst of elements by
// replaying the same sequence of elements with a later timestamp and expecting the same
http://git-wip-us.apache.org/repos/asf/flink/blob/0930179f/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java
index a48bc0c..0519da7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java
@@ -44,6 +44,7 @@ public class SlidingTimePreReducerTest {
ReduceFunction<Tuple2<Integer, Integer>> tupleReducer = new TupleSumReducer();
@Test
+ @SuppressWarnings("unchecked")
public void testPreReduce1() throws Exception {
// This ensures that the buffer is properly cleared after a burst of elements by
// replaying the same sequence of elements with a later timestamp and expecting the same