You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/18 19:25:54 UTC
[17/51] [abbrv] git commit: [streaming] Moved task configurations to
StreamConfig
[streaming] Moved task configurations to StreamConfig
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/330d8fd5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/330d8fd5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/330d8fd5
Branch: refs/heads/master
Commit: 330d8fd524ba6cbc1ec96132ee75698d1f0d4af3
Parents: a2c4137
Author: ghermann <re...@gmail.com>
Authored: Thu Jul 24 14:57:28 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 18 16:21:11 2014 +0200
----------------------------------------------------------------------
.../flink/streaming/api/StreamConfig.java | 242 +++++++++++++++++++
.../AbstractStreamComponent.java | 53 ++--
.../api/streamcomponent/CoStreamTask.java | 30 +--
.../SingleInputAbstractStreamComponent.java | 13 +-
.../streamcomponent/StreamIterationSink.java | 2 +-
.../streamcomponent/StreamIterationSource.java | 12 +-
.../api/streamcomponent/StreamSink.java | 4 +-
.../api/streamcomponent/StreamSource.java | 14 +-
.../api/streamcomponent/StreamTask.java | 13 +-
.../api/streamcomponent/StreamWindowTask.java | 96 --------
.../examples/window/sum/WindowSumAggregate.java | 67 -----
.../examples/window/sum/WindowSumLocal.java | 43 ----
.../examples/window/sum/WindowSumMultiple.java | 36 ---
.../examples/window/sum/WindowSumSink.java | 31 ---
.../examples/window/sum/WindowSumSource.java | 41 ----
.../wordcount/WindowWordCountCounter.java | 82 -------
.../window/wordcount/WindowWordCountLocal.java | 50 ----
.../window/wordcount/WindowWordCountSink.java | 32 ---
.../wordcount/WindowWordCountSplitter.java | 46 ----
19 files changed, 293 insertions(+), 614 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/330d8fd5/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
new file mode 100644
index 0000000..d677046
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
@@ -0,0 +1,242 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api;
+
+import java.io.IOException;
+
+import org.apache.commons.lang3.SerializationException;
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.collector.OutputSelector;
+import org.apache.flink.streaming.api.invokable.StreamComponentInvokable;
+import org.apache.flink.streaming.api.streamcomponent.StreamComponentException;
+import org.apache.flink.streaming.partitioner.ShufflePartitioner;
+import org.apache.flink.streaming.partitioner.StreamPartitioner;
+
+public class StreamConfig {
+ private static final String INPUT_TYPE = "inputType_";
+ private static final String NUMBER_OF_OUTPUTS = "numberOfOutputs";
+ private static final String NUMBER_OF_INPUTS = "numberOfInputs";
+ private static final String OUTPUT_NAME = "outputName_";
+ private static final String PARTITIONER_OBJECT = "partitionerObject_";
+ private static final String USER_DEFINED_NAME = "userDefinedName";
+ private static final String NUMBER_OF_OUTPUT_CHANNELS = "numOfOutputs_";
+ private static final String ITERATION_ID = "iteration-id";
+ private static final String OUTPUT_SELECTOR = "outputSelector";
+ private static final String DIRECTED_EMIT = "directedEmit";
+ private static final String FUNCTION_NAME = "operatorName";
+ private static final String FUNCTION = "operator";
+ private static final String COMPONENT_NAME = "componentName";
+ private static final String SERIALIZEDUDF = "serializedudf";
+ private static final String USER_FUNCTION = "userfunction";
+ private static final String BUFFER_TIMEOUT = "bufferTimeout";
+
+ // DEFAULT VALUES
+
+ private static final boolean DEFAULT_IS_MUTABLE = false;
+
+ private static final long DEFAULT_TIMEOUT = 0;
+
+ // STRINGS
+
+ private static final String MUTABILITY = "isMutable";
+
+ private Configuration config;
+
+ public StreamConfig(Configuration config) {
+ this.config = config;
+ }
+
+ public Configuration getConfiguration() {
+ return config;
+ }
+
+ // CONFIGS
+
+ public void setMutability(boolean isMutable) {
+ config.setBoolean(MUTABILITY, isMutable);
+ }
+
+ public boolean getMutability() {
+ return config.getBoolean(MUTABILITY, DEFAULT_IS_MUTABLE);
+ }
+
+ public void setBufferTimeout(long timeout) {
+ config.setLong(BUFFER_TIMEOUT, timeout);
+ }
+
+ public long getBufferTimeout() {
+ return config.getLong(BUFFER_TIMEOUT, DEFAULT_TIMEOUT);
+ }
+
+ public void setUserInvokableClass(Class<? extends StreamComponentInvokable> clazz) {
+ config.setClass(USER_FUNCTION, clazz);
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T extends StreamComponentInvokable> Class<? extends T> getUserInvokableClass() {
+ return (Class<? extends T>) config.getClass(USER_FUNCTION, null);
+ }
+
+ public void setUserInvokableObject(StreamComponentInvokable invokableObject) {
+ try {
+ config.setBytes(SERIALIZEDUDF, SerializationUtils.serialize(invokableObject));
+ } catch (SerializationException e) {
+ throw new RuntimeException("Cannot serialize invokable object "
+ + invokableObject.getClass(), e);
+ }
+ }
+
+ public StreamComponentInvokable getUserInvokableObject() {
+ try {
+ return deserializeObject(config.getBytes(SERIALIZEDUDF, null));
+ } catch (Exception e) {
+ new StreamComponentException("Cannot instantiate user function");
+ }
+ return null;
+ }
+
+ public void setComponentName(String componentName) {
+ config.setString(COMPONENT_NAME, componentName);
+ }
+
+ public String getComponentName() {
+ return config.getString(COMPONENT_NAME, null);
+ }
+
+ public void setFunction(byte[] serializedFunction) {
+ config.setBytes(FUNCTION, serializedFunction);
+ }
+
+ public Object getFunction() {
+ try {
+ return SerializationUtils.deserialize(config
+ .getBytes(FUNCTION, null));
+ } catch (SerializationException e) {
+ throw new RuntimeException("Cannot deserialize invokable object", e);
+ }
+ }
+
+ public void setFunctionName(String functionName) {
+ config.setString(FUNCTION_NAME, functionName);
+ }
+
+ public String getFunctionName() {
+ return config.getString(FUNCTION_NAME, "");
+ }
+
+ public void setUserDefinedName(String userDefinedName) {
+ if (userDefinedName != null) {
+ config.setString(USER_DEFINED_NAME, userDefinedName);
+ }
+ }
+
+ public void setDirectedEmit(boolean directedEmit) {
+ config.setBoolean(DIRECTED_EMIT, directedEmit);
+ }
+
+ public boolean getDirectedEmit() {
+ return config.getBoolean(DIRECTED_EMIT, false);
+ }
+
+ public void setOutputSelector(byte[] outputSelector) {
+ config.setBytes(OUTPUT_SELECTOR, outputSelector);
+
+ }
+
+ public <T extends Tuple> OutputSelector<T> getOutputSelector() {
+ try {
+ return deserializeObject(config.getBytes(OUTPUT_SELECTOR, null));
+ } catch (Exception e) {
+ throw new StreamComponentException("Cannot deserialize and instantiate OutputSelector",
+ e);
+ }
+ }
+
+ public void setIterationId(String iterationId) {
+ config.setString(ITERATION_ID, iterationId);
+ }
+
+ public String getIterationId() {
+ return config.getString(ITERATION_ID, "iteration-0");
+ }
+
+ public void setNumberOfOutputChannels(int outputIndex, Integer numberOfOutputChannels) {
+ config.setInteger(NUMBER_OF_OUTPUT_CHANNELS + outputIndex, numberOfOutputChannels);
+ }
+
+ public int getNumberOfOutputChannels(int outputIndex) {
+ return config.getInteger(NUMBER_OF_OUTPUT_CHANNELS + outputIndex, 0);
+ }
+
+ public <T extends Tuple> void setPartitioner(int outputIndex,
+ StreamPartitioner<T> partitionerObject) {
+
+ config.setBytes(PARTITIONER_OBJECT + outputIndex,
+ SerializationUtils.serialize(partitionerObject));
+ }
+
+ public <T extends Tuple> StreamPartitioner<T> getPartitioner(int outputIndex)
+ throws ClassNotFoundException, IOException {
+ return deserializeObject(config.getBytes(PARTITIONER_OBJECT + outputIndex,
+ SerializationUtils.serialize(new ShufflePartitioner<T>())));
+ }
+
+ public void setOutputName(int outputIndex, String outputName) {
+ if (outputName != null) {
+ config.setString(OUTPUT_NAME + outputIndex, outputName);
+ }
+ }
+
+ public String getOutputName(int outputIndex) {
+ return config.getString(OUTPUT_NAME + outputIndex, null);
+ }
+
+ public void setNumberOfInputs(int numberOfInputs) {
+ config.setInteger(NUMBER_OF_INPUTS, numberOfInputs);
+ }
+
+ public int getNumberOfInputs() {
+ return config.getInteger(NUMBER_OF_INPUTS, 0);
+ }
+
+ public void setNumberOfOutputs(int numberOfOutputs) {
+ config.setInteger(NUMBER_OF_OUTPUTS, numberOfOutputs);
+ }
+
+ public int getNumberOfOutputs() {
+ return config.getInteger(NUMBER_OF_OUTPUTS, 0);
+ }
+
+ public void setInputType(int inputNumber, Integer inputTypeNumber) {
+ config.setInteger(INPUT_TYPE + inputNumber++, inputTypeNumber);
+ }
+
+ public int getInputType(int inputNumber) {
+ return config.getInteger(INPUT_TYPE + inputNumber, 0);
+ }
+
+ @SuppressWarnings("unchecked")
+ protected static <T> T deserializeObject(byte[] serializedObject) throws IOException,
+ ClassNotFoundException {
+ return (T) SerializationUtils.deserialize(serializedObject);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/330d8fd5/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
index 8afbddf..775e722 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
@@ -29,20 +29,19 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.network.api.MutableReader;
import org.apache.flink.runtime.io.network.api.RecordWriter;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.operators.util.ReaderIterator;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.api.StreamConfig;
import org.apache.flink.streaming.api.collector.DirectedStreamCollector;
import org.apache.flink.streaming.api.collector.OutputSelector;
import org.apache.flink.streaming.api.collector.StreamCollector;
import org.apache.flink.streaming.api.invokable.StreamComponentInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
-import org.apache.flink.streaming.partitioner.ShufflePartitioner;
import org.apache.flink.streaming.partitioner.StreamPartitioner;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
@@ -55,7 +54,7 @@ public abstract class AbstractStreamComponent<OUT extends Tuple> extends Abstrac
protected StreamRecordSerializer<OUT> outTupleSerializer = null;
protected SerializationDelegate<StreamRecord<OUT>> outSerializationDelegate = null;
- protected Configuration configuration;
+ protected StreamConfig configuration;
protected StreamCollector<OUT> collector;
protected int instanceID;
protected String name;
@@ -68,19 +67,13 @@ public abstract class AbstractStreamComponent<OUT extends Tuple> extends Abstrac
}
protected void initialize() {
- configuration = getTaskConfiguration();
- name = configuration.getString("componentName", "MISSING_COMPONENT_NAME");
+ configuration = new StreamConfig(getTaskConfiguration());
+ name = configuration.getComponentName();
}
protected Collector<OUT> setCollector() {
- if (configuration.getBoolean("directedEmit", false)) {
- OutputSelector<OUT> outputSelector = null;
- try {
- outputSelector = deserializeObject(configuration.getBytes("outputSelector", null));
- } catch (Exception e) {
- throw new StreamComponentException(
- "Cannot deserialize and instantiate OutputSelector", e);
- }
+ if (configuration.getDirectedEmit()) {
+ OutputSelector<OUT> outputSelector = configuration.getOutputSelector();
collector = new DirectedStreamCollector<OUT>(instanceID, outSerializationDelegate,
outputSelector);
@@ -102,7 +95,7 @@ public abstract class AbstractStreamComponent<OUT extends Tuple> extends Abstrac
protected void setConfigOutputs(
List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs) {
- int numberOfOutputs = configuration.getInteger("numberOfOutputs", 0);
+ int numberOfOutputs = configuration.getNumberOfOutputs();
for (int i = 0; i < numberOfOutputs; i++) {
setPartitioner(i, outputs);
@@ -111,17 +104,14 @@ public abstract class AbstractStreamComponent<OUT extends Tuple> extends Abstrac
private void setPartitioner(int outputNumber,
List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs) {
-
- byte[] serializedPartitioner = configuration.getBytes("partitionerObject_" + outputNumber,
- SerializationUtils.serialize((new ShufflePartitioner<OUT>())));
StreamPartitioner<OUT> outputPartitioner = null;
-
+
try {
- outputPartitioner = deserializeObject(serializedPartitioner);
+ outputPartitioner = configuration.getPartitioner(outputNumber);
RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output;
- long bufferTimeout = configuration.getLong("bufferTimeout", 0);
+ long bufferTimeout = configuration.getBufferTimeout();
if (bufferTimeout > 0) {
output = new StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>>(this,
@@ -132,7 +122,7 @@ public abstract class AbstractStreamComponent<OUT extends Tuple> extends Abstrac
}
outputs.add(output);
- String outputName = configuration.getString("outputName_" + outputNumber, null);
+ String outputName = configuration.getOutputName(outputNumber);
if (collector != null) {
collector.addOutput(output, outputName);
@@ -143,7 +133,7 @@ public abstract class AbstractStreamComponent<OUT extends Tuple> extends Abstrac
+ " with " + outputNumber + " outputs");
}
} catch (Exception e) {
- throw new StreamComponentException("Cannot deserialize "
+ throw new StreamComponentException("Cannot deserialize partitioner "
+ outputPartitioner.getClass().getSimpleName() + " of " + name + " with "
+ outputNumber + " outputs", e);
}
@@ -158,22 +148,9 @@ public abstract class AbstractStreamComponent<OUT extends Tuple> extends Abstrac
*/
protected StreamComponentInvokable getInvokable(
Class<? extends StreamComponentInvokable> userFunctionClass) {
- StreamComponentInvokable userFunction = null;
-
- byte[] userFunctionSerialized = configuration.getBytes("serializedudf", null);
- this.isMutable = configuration.getBoolean("isMutable", false);
-
- try {
- userFunction = deserializeObject(userFunctionSerialized);
- } catch (ClassNotFoundException e) {
- new StreamComponentException("Cannot instantiate user function: "
- + userFunctionClass.getSimpleName());
- } catch (IOException e) {
- new StreamComponentException("Cannot instantiate user function: "
- + userFunctionClass.getSimpleName());
- }
-
- return userFunction;
+
+ this.isMutable = configuration.getMutability();
+ return configuration.getUserInvokableObject();
}
protected <IN extends Tuple> MutableObjectIterator<StreamRecord<IN>> createInputIterator(
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/330d8fd5/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
index 60a8152..6a9c897 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
@@ -19,8 +19,6 @@
package org.apache.flink.streaming.api.streamcomponent;
-import java.io.ByteArrayInputStream;
-import java.io.ObjectInputStream;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
@@ -39,7 +37,6 @@ import org.apache.flink.runtime.io.network.api.RecordWriter;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.api.function.co.CoMapFunction;
import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
-import org.apache.flink.streaming.api.invokable.operator.co.CoMapInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
import org.apache.flink.util.MutableObjectIterator;
@@ -58,7 +55,7 @@ public class CoStreamTask<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tupl
private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs;
private CoInvokable<IN1, IN2, OUT> userFunction;
- private int[] numberOfOutputChannels;
+// private int[] numberOfOutputChannels;
private static int numTasks;
public CoStreamTask() {
@@ -70,21 +67,16 @@ public class CoStreamTask<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tupl
}
protected void setSerializers() {
- byte[] operatorBytes = configuration.getBytes("operator", null);
- String operatorName = configuration.getString("operatorName", "");
+ String operatorName = configuration.getFunctionName();
- Object function = null;
+ Object function = configuration.getFunction();
try {
- ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(operatorBytes));
- function = in.readObject();
-
if (operatorName.equals("coMap")) {
setSerializer(function, CoMapFunction.class, 2);
setDeserializers(function, CoMapFunction.class);
} else {
throw new Exception("Wrong operator name!");
}
-
} catch (Exception e) {
throw new StreamComponentException(e);
}
@@ -119,10 +111,10 @@ public class CoStreamTask<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tupl
setConfigOutputs(outputs);
- numberOfOutputChannels = new int[outputs.size()];
- for (int i = 0; i < numberOfOutputChannels.length; i++) {
- numberOfOutputChannels[i] = configuration.getInteger("channels_" + i, 0);
- }
+// numberOfOutputChannels = new int[outputs.size()];
+// for (int i = 0; i < numberOfOutputChannels.length; i++) {
+// numberOfOutputChannels[i] = configuration.getInteger("channels_" + i, 0);
+// }
setInvokable();
}
@@ -131,21 +123,21 @@ public class CoStreamTask<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tupl
@Override
protected void setInvokable() {
// Default value is a CoMapInvokable
- Class<? extends CoInvokable> userFunctionClass = configuration.getClass("userfunction",
- CoMapInvokable.class, CoInvokable.class);
+ Class<? extends CoInvokable> userFunctionClass = configuration.getUserInvokableClass();
+
userFunction = (CoInvokable<IN1, IN2, OUT>) getInvokable(userFunctionClass);
userFunction.initialize(collector, inputIter1, inTupleSerializer1, inputIter2,
inTupleSerializer2, isMutable);
}
protected void setConfigInputs() throws StreamComponentException {
- int numberOfInputs = configuration.getInteger("numberOfInputs", 0);
+ int numberOfInputs = configuration.getNumberOfInputs();
ArrayList<MutableRecordReader<IOReadableWritable>> inputList1 = new ArrayList<MutableRecordReader<IOReadableWritable>>();
ArrayList<MutableRecordReader<IOReadableWritable>> inputList2 = new ArrayList<MutableRecordReader<IOReadableWritable>>();
for (int i = 0; i < numberOfInputs; i++) {
- int inputType = configuration.getInteger("inputType_" + i, 0);
+ int inputType = configuration.getInputType(i);
switch (inputType) {
case 1:
inputList1.add(new MutableRecordReader<IOReadableWritable>(this));
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/330d8fd5/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/SingleInputAbstractStreamComponent.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/SingleInputAbstractStreamComponent.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/SingleInputAbstractStreamComponent.java
index b49620f..86b26ff 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/SingleInputAbstractStreamComponent.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/SingleInputAbstractStreamComponent.java
@@ -19,9 +19,6 @@
package org.apache.flink.streaming.api.streamcomponent;
-import java.io.ByteArrayInputStream;
-import java.io.ObjectInputStream;
-
import org.apache.flink.api.common.functions.AbstractFunction;
import org.apache.flink.api.java.functions.FilterFunction;
import org.apache.flink.api.java.functions.FlatMapFunction;
@@ -47,14 +44,10 @@ public abstract class SingleInputAbstractStreamComponent<IN extends Tuple, OUT e
protected StreamRecordSerializer<IN> inTupleSerializer = null;
protected void setSerializers() {
- byte[] operatorBytes = configuration.getBytes("operator", null);
- String operatorName = configuration.getString("operatorName", "");
+ String operatorName = configuration.getFunctionName();
- Object function = null;
+ Object function = configuration.getFunction();
try {
- ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(operatorBytes));
- function = in.readObject();
-
if (operatorName.equals("flatMap")) {
setSerializerDeserializer(function, FlatMapFunction.class);
} else if (operatorName.equals("map")) {
@@ -110,7 +103,7 @@ public abstract class SingleInputAbstractStreamComponent<IN extends Tuple, OUT e
@SuppressWarnings("unchecked")
protected MutableReader<IOReadableWritable> getConfigInputs() throws StreamComponentException {
- int numberOfInputs = configuration.getInteger("numberOfInputs", 0);
+ int numberOfInputs = configuration.getNumberOfInputs();
if (numberOfInputs < 2) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/330d8fd5/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
index 1b25285..224fdfb 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
@@ -53,7 +53,7 @@ public class StreamIterationSink<IN extends Tuple> extends SingleInputAbstractSt
setSinkSerializer();
inputs = getConfigInputs();
inputIter = createInputIterator(inputs, inTupleSerializer);
- iterationId = configuration.getString("iteration-id", "iteration-0");
+ iterationId = configuration.getIterationId();
dataChannel = BlockingQueueBroker.instance().get(iterationId);
} catch (Exception e) {
throw new StreamComponentException(String.format(
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/330d8fd5/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
index f880470..37e8e0f 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
@@ -38,7 +38,7 @@ public class StreamIterationSource<OUT extends Tuple> extends SingleInputAbstrac
private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs;
private static int numSources;
- private int[] numberOfOutputChannels;
+// private int[] numberOfOutputChannels;
private String iterationId;
@SuppressWarnings("rawtypes")
private BlockingQueue<StreamRecord> dataChannel;
@@ -63,12 +63,12 @@ public class StreamIterationSource<OUT extends Tuple> extends SingleInputAbstrac
throw new StreamComponentException("Cannot register outputs", e);
}
- numberOfOutputChannels = new int[outputs.size()];
- for (int i = 0; i < numberOfOutputChannels.length; i++) {
- numberOfOutputChannels[i] = configuration.getInteger("channels_" + i, 0);
- }
+// numberOfOutputChannels = new int[outputs.size()];
+// for (int i = 0; i < numberOfOutputChannels.length; i++) {
+// numberOfOutputChannels[i] = configuration.getInteger("channels_" + i, 0);
+// }
- iterationId = configuration.getString("iteration-id", "iteration-0");
+ iterationId = configuration.getIterationId();
try {
BlockingQueueBroker.instance().handIn(iterationId, dataChannel);
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/330d8fd5/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
index 5e3457c..8cbe0ea 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
@@ -61,8 +61,8 @@ public class StreamSink<IN extends Tuple> extends SingleInputAbstractStreamCompo
@SuppressWarnings({ "rawtypes", "unchecked" })
@Override
protected void setInvokable() {
- Class<? extends SinkInvokable> userFunctionClass = configuration.getClass("userfunction",
- SinkInvokable.class, SinkInvokable.class);
+ Class<? extends SinkInvokable> userFunctionClass = configuration.getUserInvokableClass();
+
userFunction = (SinkInvokable<IN>) getInvokable(userFunctionClass);
userFunction.initialize(collector, inputIter, inTupleSerializer, isMutable);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/330d8fd5/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java
index 12c9ba3..c4f38ce 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java
@@ -37,7 +37,7 @@ public class StreamSource<OUT extends Tuple> extends SingleInputAbstractStreamCo
private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs;
private UserSourceInvokable<OUT> userFunction;
private static int numSources;
- private int[] numberOfOutputChannels;
+// private int[] numberOfOutputChannels;
public StreamSource() {
@@ -60,10 +60,10 @@ public class StreamSource<OUT extends Tuple> extends SingleInputAbstractStreamCo
+ getClass().getSimpleName(), e);
}
- numberOfOutputChannels = new int[outputs.size()];
- for (int i = 0; i < numberOfOutputChannels.length; i++) {
- numberOfOutputChannels[i] = configuration.getInteger("channels_" + i, 0);
- }
+// numberOfOutputChannels = new int[outputs.size()];
+// for (int i = 0; i < numberOfOutputChannels.length; i++) {
+// numberOfOutputChannels[i] = configuration.getInteger("channels_" + i, 0);
+// }
setInvokable();
}
@@ -72,8 +72,8 @@ public class StreamSource<OUT extends Tuple> extends SingleInputAbstractStreamCo
@Override
protected void setInvokable() {
// Default value is a TaskInvokable even if it was called from a source
- Class<? extends UserSourceInvokable> userFunctionClass = configuration.getClass(
- "userfunction", UserSourceInvokable.class, UserSourceInvokable.class);
+ Class<? extends UserSourceInvokable> userFunctionClass = configuration.getUserInvokableClass();
+// .getClass("userfunction", UserSourceInvokable.class, UserSourceInvokable.class);
userFunction = (UserSourceInvokable<OUT>) getInvokable(userFunctionClass);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/330d8fd5/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
index 5032446..12e064f 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
@@ -43,7 +43,7 @@ public class StreamTask<IN extends Tuple, OUT extends Tuple> extends
MutableObjectIterator<StreamRecord<IN>> inputIter;
private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs;
private StreamRecordInvokable<IN, OUT> userFunction;
- private int[] numberOfOutputChannels;
+// private int[] numberOfOutputChannels;
private static int numTasks;
public StreamTask() {
@@ -65,10 +65,10 @@ public class StreamTask<IN extends Tuple, OUT extends Tuple> extends
inputIter = createInputIterator(inputs, inTupleSerializer);
- numberOfOutputChannels = new int[outputs.size()];
- for (int i = 0; i < numberOfOutputChannels.length; i++) {
- numberOfOutputChannels[i] = configuration.getInteger("channels_" + i, 0);
- }
+// numberOfOutputChannels = new int[outputs.size()];
+// for (int i = 0; i < numberOfOutputChannels.length; i++) {
+// numberOfOutputChannels[i] = configuration.getInteger("channels_" + i, 0);
+// }
setInvokable();
}
@@ -77,8 +77,7 @@ public class StreamTask<IN extends Tuple, OUT extends Tuple> extends
@Override
protected void setInvokable() {
// Default value is a TaskInvokable even if it was called from a source
- Class<? extends UserTaskInvokable> userFunctionClass = configuration.getClass(
- "userfunction", UserTaskInvokable.class, UserTaskInvokable.class);
+ Class<? extends UserTaskInvokable> userFunctionClass = configuration.getUserInvokableClass();
userFunction = (UserTaskInvokable<IN, OUT>) getInvokable(userFunctionClass);
userFunction.initialize(collector, inputIter, inTupleSerializer, isMutable);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/330d8fd5/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamWindowTask.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamWindowTask.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamWindowTask.java
deleted file mode 100644
index c044cc3..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamWindowTask.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.flink.streaming.api.streamcomponent;
-
-import java.util.ArrayList;
-
-import org.apache.flink.api.java.functions.FlatMapFunction;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.streaming.state.SlidingWindowState;
-import org.apache.flink.streaming.state.StateManager;
-import org.apache.flink.util.Collector;
-
-public class StreamWindowTask<InTuple extends Tuple, OutTuple extends Tuple>
- extends FlatMapFunction<InTuple, OutTuple> {
- private static final long serialVersionUID = 1L;
-
- private int computeGranularity;
- private int windowFieldId;
-
- private ArrayList<InTuple> tempTupleArray;
- private SlidingWindowState<InTuple> window;
- private long initTimestamp = -1;
- private long nextTimestamp = -1;
-
- protected StateManager checkpointer = new StateManager("object.out", 1000);
-
- public StreamWindowTask(int windowSize, int slidingStep,
- int computeGranularity, int windowFieldId) {
- this.computeGranularity = computeGranularity;
- this.windowFieldId = windowFieldId;
- window = new SlidingWindowState<InTuple>(windowSize, slidingStep,
- computeGranularity);
- checkpointer.registerState(window);
- Thread t = new Thread(checkpointer);
- t.start();
- }
-
- protected void incrementCompute(ArrayList<InTuple> tupleArray) {
- }
-
- protected void decrementCompute(ArrayList<InTuple> tupleArray) {
- }
-
- protected void produceOutput(long progress, Collector<OutTuple> out) {
- }
-
- @Override
- public void flatMap(InTuple value, Collector<OutTuple> out)
- throws Exception {
- long progress = (Long) value.getField(windowFieldId);
- if (initTimestamp == -1) {
- initTimestamp = progress;
- nextTimestamp = initTimestamp + computeGranularity;
- tempTupleArray = new ArrayList<InTuple>();
- } else {
- if (progress > nextTimestamp) {
- if (window.isFull()) {
- ArrayList<InTuple> expiredTupleArray = window.popFront();
- incrementCompute(tempTupleArray);
- decrementCompute(expiredTupleArray);
- window.pushBack(tempTupleArray);
- if (window.isEmittable()) {
- produceOutput(progress, out);
- }
- } else {
- incrementCompute(tempTupleArray);
- window.pushBack(tempTupleArray);
- if (window.isFull()) {
- produceOutput(progress, out);
- }
- }
- initTimestamp = nextTimestamp;
- nextTimestamp = initTimestamp + computeGranularity;
- tempTupleArray = new ArrayList<InTuple>();
- }
- tempTupleArray.add(value);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/330d8fd5/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/sum/WindowSumAggregate.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/sum/WindowSumAggregate.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/sum/WindowSumAggregate.java
deleted file mode 100644
index 12ada98..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/sum/WindowSumAggregate.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.flink.streaming.examples.window.sum;
-
-import java.util.ArrayList;
-
-import org.apache.flink.streaming.api.streamcomponent.StreamWindowTask;
-import org.apache.flink.streaming.state.TableState;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.util.Collector;
-
-public class WindowSumAggregate extends
- StreamWindowTask<Tuple2<Integer, Long>, Tuple2<Integer, Long>> {
- private static final long serialVersionUID = -2832409561059237150L;
- private TableState<String, Integer> sum;
- private Tuple2<Integer, Long> outTuple = new Tuple2<Integer, Long>();
-
-
- public WindowSumAggregate(int windowSize, int slidingStep,
- int computeGranularity, int windowFieldId) {
- super(windowSize, slidingStep, computeGranularity, windowFieldId);
- sum = new TableState<String, Integer>();
- sum.put("sum", 0);
- checkpointer.registerState(sum);
- }
-
- @Override
- protected void incrementCompute(ArrayList<Tuple2<Integer, Long>> tupleArray) {
- for (int i = 0; i < tupleArray.size(); ++i) {
- int number = tupleArray.get(i).f0;
- sum.put("sum", sum.get("sum") + number);
- }
- }
-
- @Override
- protected void decrementCompute(ArrayList<Tuple2<Integer, Long>> tupleArray) {
- for (int i = 0; i < tupleArray.size(); ++i) {
- int number = tupleArray.get(i).f0;
- sum.put("sum", sum.get("sum") - number);
- }
- }
-
- @Override
- protected void produceOutput(long progress, Collector<Tuple2<Integer, Long>> out){
- outTuple.f0 = sum.get("sum");
- outTuple.f1 = progress;
- out.collect(outTuple);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/330d8fd5/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/sum/WindowSumLocal.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/sum/WindowSumLocal.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/sum/WindowSumLocal.java
deleted file mode 100644
index c8390f0..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/sum/WindowSumLocal.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.flink.streaming.examples.window.sum;
-
-import org.apache.flink.streaming.api.DataStream;
-import org.apache.flink.streaming.api.StreamExecutionEnvironment;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-
-public class WindowSumLocal {
-
- private static final int PARALLELISM = 1;
- private static final int SOURCE_PARALLELISM = 1;
-
- public static void main(String[] args) {
- StreamExecutionEnvironment env = StreamExecutionEnvironment
- .createLocalEnvironment(PARALLELISM);
-
- @SuppressWarnings("unused")
- DataStream<Tuple2<Integer, Long>> dataStream = env
- .addSource(new WindowSumSource(), SOURCE_PARALLELISM).map(new WindowSumMultiple())
- .flatMap(new WindowSumAggregate(100, 20, 10, 1)).addSink(new WindowSumSink());
-
- env.execute();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/330d8fd5/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/sum/WindowSumMultiple.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/sum/WindowSumMultiple.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/sum/WindowSumMultiple.java
deleted file mode 100644
index 8d4ac0c..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/sum/WindowSumMultiple.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.flink.streaming.examples.window.sum;
-
-import org.apache.flink.api.java.functions.MapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-
-public class WindowSumMultiple extends MapFunction<Tuple2<Integer, Long>, Tuple2<Integer, Long>> {
- private static final long serialVersionUID = 1L;
-
- private Tuple2<Integer, Long> outTuple = new Tuple2<Integer, Long>();
-
- @Override
- public Tuple2<Integer, Long> map(Tuple2<Integer, Long> inTuple) throws Exception {
- outTuple.f0 = inTuple.f0 * 2;
- outTuple.f1 = inTuple.f1;
- return outTuple;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/330d8fd5/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/sum/WindowSumSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/sum/WindowSumSink.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/sum/WindowSumSink.java
deleted file mode 100644
index 9c4547a..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/sum/WindowSumSink.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.flink.streaming.examples.window.sum;
-
-import org.apache.flink.streaming.api.function.sink.SinkFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-
-public class WindowSumSink extends SinkFunction<Tuple2<Integer, Long>> {
- private static final long serialVersionUID = 1L;
- @Override
- public void invoke(Tuple2<Integer, Long> inTuple) {
- System.out.println(inTuple);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/330d8fd5/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/sum/WindowSumSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/sum/WindowSumSource.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/sum/WindowSumSource.java
deleted file mode 100644
index 12eda38..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/sum/WindowSumSource.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.flink.streaming.examples.window.sum;
-
-import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.util.Collector;
-
-public class WindowSumSource extends SourceFunction<Tuple2<Integer, Long>> {
- private static final long serialVersionUID = 1L;
-
- private Tuple2<Integer, Long> outRecord = new Tuple2<Integer, Long>();
- private Long timestamp = 0L;
-
- @Override
- public void invoke(Collector<Tuple2<Integer, Long>> collector) throws Exception {
- for (int i = 0; i < 1000; ++i) {
- outRecord.f0 = i;
- outRecord.f1 = timestamp;
- collector.collect(outRecord);
- timestamp++;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/330d8fd5/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/wordcount/WindowWordCountCounter.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/wordcount/WindowWordCountCounter.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/wordcount/WindowWordCountCounter.java
deleted file mode 100644
index dd99271..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/wordcount/WindowWordCountCounter.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.flink.streaming.examples.window.wordcount;
-
-import java.util.ArrayList;
-
-import org.apache.flink.streaming.api.streamcomponent.StreamWindowTask;
-import org.apache.flink.streaming.state.TableState;
-import org.apache.flink.streaming.state.TableStateIterator;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.util.Collector;
-
-public class WindowWordCountCounter extends
- StreamWindowTask<Tuple2<String, Long>, Tuple3<String, Integer, Long>> {
- private static final long serialVersionUID = 1L;
-
- private Tuple3<String, Integer, Long> outTuple = new Tuple3<String, Integer, Long>();
- private TableState<String, Integer> wordCounts;
-
- public WindowWordCountCounter(int windowSize, int slidingStep,
- int computeGranularity, int windowFieldId) {
- super(windowSize, slidingStep, computeGranularity, windowFieldId);
- wordCounts = new TableState<String, Integer>();
- }
-
- @Override
- protected void incrementCompute(ArrayList<Tuple2<String, Long>> tupleArray) {
- for (int i = 0; i < tupleArray.size(); ++i) {
- String word = tupleArray.get(i).f0;
- if (wordCounts.containsKey(word)) {
- int count = wordCounts.get(word) + 1;
- wordCounts.put(word, count);
- } else {
- wordCounts.put(word, 1);
- }
- }
- }
-
- @Override
- protected void decrementCompute(ArrayList<Tuple2<String, Long>> tupleArray) {
- for (int i = 0; i < tupleArray.size(); ++i) {
- String word = tupleArray.get(i).f0;
- int count = wordCounts.get(word) - 1;
- if (count == 0) {
- wordCounts.delete(word);
- } else {
- wordCounts.put(word, count);
- }
- }
- }
-
- @Override
- protected void produceOutput(long progress, Collector<Tuple3<String, Integer, Long>> out) {
- TableStateIterator<String, Integer> iterator = wordCounts.getIterator();
- while (iterator.hasNext()) {
- Tuple2<String, Integer> tuple = iterator.next();
- outTuple.f0 = tuple.f0;
- outTuple.f1 = tuple.f1;
- outTuple.f2 = progress;
- out.collect(outTuple);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/330d8fd5/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/wordcount/WindowWordCountLocal.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/wordcount/WindowWordCountLocal.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/wordcount/WindowWordCountLocal.java
deleted file mode 100644
index 2ef5af9..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/wordcount/WindowWordCountLocal.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.flink.streaming.examples.window.wordcount;
-
-import org.apache.flink.streaming.api.DataStream;
-import org.apache.flink.streaming.api.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.TestDataUtil;
-
-import org.apache.flink.api.java.tuple.Tuple3;
-
-public class WindowWordCountLocal {
-
- private static final int PARALLELISM = 1;
-
- // This example will count the occurrence of each word in the input file with a sliding window.
-
- public static void main(String[] args) {
-
- TestDataUtil.downloadIfNotExists("hamlet.txt");
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(PARALLELISM);
-
- @SuppressWarnings("unused")
- DataStream<Tuple3<String, Integer, Long>> dataStream = env
- .readTextStream("src/test/resources/testdata/hamlet.txt")
- .flatMap(new WindowWordCountSplitter())
- .partitionBy(0)
- .flatMap(new WindowWordCountCounter(10, 2, 1, 1))
- .addSink(new WindowWordCountSink());
-
- env.execute();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/330d8fd5/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/wordcount/WindowWordCountSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/wordcount/WindowWordCountSink.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/wordcount/WindowWordCountSink.java
deleted file mode 100644
index f8c009f..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/wordcount/WindowWordCountSink.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.flink.streaming.examples.window.wordcount;
-
-import org.apache.flink.streaming.api.function.sink.SinkFunction;
-import org.apache.flink.api.java.tuple.Tuple3;
-
-public class WindowWordCountSink extends SinkFunction<Tuple3<String, Integer, Long>> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void invoke(Tuple3<String, Integer, Long> inTuple) {
- System.out.println(inTuple);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/330d8fd5/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/wordcount/WindowWordCountSplitter.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/wordcount/WindowWordCountSplitter.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/wordcount/WindowWordCountSplitter.java
deleted file mode 100644
index 19a533e..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/wordcount/WindowWordCountSplitter.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.flink.streaming.examples.window.wordcount;
-
-import org.apache.flink.api.java.functions.FlatMapFunction;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.util.Collector;
-
-public class WindowWordCountSplitter extends FlatMapFunction<Tuple1<String>, Tuple2<String, Long>> {
- private static final long serialVersionUID = 1L;
-
- private String[] words = new String[] {};
- private Long timestamp = 0L;
- private Tuple2<String, Long> outTuple = new Tuple2<String, Long>();
-
- // Splits the lines according to the spaces. And adds the line's timestamp to them.
- @Override
- public void flatMap(Tuple1<String> inTuple, Collector<Tuple2<String, Long>> out) throws Exception {
-
- words=inTuple.f0.split(" ");
- timestamp=System.currentTimeMillis();
- for(String word : words){
- outTuple.f0 = word;
- outTuple.f1 = timestamp;
- out.collect(outTuple);
- }
- }
-}
\ No newline at end of file