You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/04/15 11:38:54 UTC
[13/19] flink git commit: [streaming] Major internal renaming and
restructure
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/WindowingOptimizer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/WindowingOptimizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/WindowingOptimizer.java
new file mode 100644
index 0000000..b688ea4
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/WindowingOptimizer.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer;
+import org.apache.flink.streaming.api.operators.windowing.WindowFlattener;
+import org.apache.flink.streaming.api.operators.windowing.WindowMerger;
+import org.apache.flink.streaming.runtime.partitioner.DistributePartitioner;
+
+public class WindowingOptimizer {
+
+ public static void optimizeGraph(StreamGraph streamGraph) {
+
+ // Share common discrtizers
+ setDiscretizerReuse(streamGraph);
+
+ // Remove unnecessary merges before flatten operators
+ removeMergeBeforeFlatten(streamGraph);
+ }
+
+ @SuppressWarnings("rawtypes")
+ private static void removeMergeBeforeFlatten(StreamGraph streamGraph) {
+ Set<Tuple2<Integer, StreamOperator<?, ?>>> operators = streamGraph.getOperators();
+ List<Integer> flatteners = new ArrayList<Integer>();
+
+ for (Tuple2<Integer, StreamOperator<?, ?>> entry : operators) {
+ if (entry.f1 instanceof WindowFlattener) {
+ flatteners.add(entry.f0);
+ }
+ }
+
+ for (Integer flattenerID : flatteners) {
+ // Flatteners should have exactly one input
+ StreamNode input = streamGraph.getVertex(flattenerID).getInEdges().get(0)
+ .getSourceVertex();
+
+ // Check whether the flatten is applied after a merge
+ if (input.getOperator() instanceof WindowMerger) {
+
+ // Mergers should have exactly one input
+ StreamNode mergeInput = input.getInEdges().get(0).getSourceVertex();
+
+ // We connect the merge input to the flattener directly
+ streamGraph.addEdge(mergeInput.getID(), flattenerID,
+ new DistributePartitioner(true), 0, new ArrayList<String>());
+
+ // If the merger is only connected to the flattener we delete it
+ // completely, otherwise we only remove the edge
+ if (input.getOutEdges().size() > 1) {
+ streamGraph.removeEdge(streamGraph.getEdge(input.getID(), flattenerID));
+ } else {
+ streamGraph.removeVertex(input);
+ }
+
+ streamGraph.setParallelism(flattenerID, mergeInput.getParallelism());
+ }
+ }
+
+ }
+
+ private static void setDiscretizerReuse(StreamGraph streamGraph) {
+
+ Set<Tuple2<Integer, StreamOperator<?, ?>>> operators = streamGraph.getOperators();
+ List<Tuple2<Integer, StreamDiscretizer<?>>> discretizers = new ArrayList<Tuple2<Integer, StreamDiscretizer<?>>>();
+
+ // Get the discretizers
+ for (Tuple2<Integer, StreamOperator<?, ?>> entry : operators) {
+ if (entry.f1 instanceof StreamDiscretizer) {
+ discretizers.add(new Tuple2<Integer, StreamDiscretizer<?>>(entry.f0,
+ (StreamDiscretizer<?>) entry.f1));
+ }
+ }
+
+ List<Tuple2<StreamDiscretizer<?>, List<Integer>>> matchingDiscretizers = new ArrayList<Tuple2<StreamDiscretizer<?>, List<Integer>>>();
+
+ for (Tuple2<Integer, StreamDiscretizer<?>> discretizer : discretizers) {
+ boolean inMatching = false;
+ for (Tuple2<StreamDiscretizer<?>, List<Integer>> matching : matchingDiscretizers) {
+ Set<Integer> discretizerInEdges = new HashSet<Integer>(streamGraph.getVertex(
+ discretizer.f0).getInEdgeIndices());
+ Set<Integer> matchingInEdges = new HashSet<Integer>(streamGraph.getVertex(
+ matching.f1.get(0)).getInEdgeIndices());
+
+ if (discretizer.f1.equals(matching.f0)
+ && discretizerInEdges.equals(matchingInEdges)) {
+ matching.f1.add(discretizer.f0);
+ inMatching = true;
+ break;
+ }
+ }
+ if (!inMatching) {
+ List<Integer> matchingNames = new ArrayList<Integer>();
+ matchingNames.add(discretizer.f0);
+ matchingDiscretizers.add(new Tuple2<StreamDiscretizer<?>, List<Integer>>(
+ discretizer.f1, matchingNames));
+ }
+ }
+
+ for (Tuple2<StreamDiscretizer<?>, List<Integer>> matching : matchingDiscretizers) {
+ List<Integer> matchList = matching.f1;
+ if (matchList.size() > 1) {
+ Integer first = matchList.get(0);
+ for (int i = 1; i < matchList.size(); i++) {
+ replaceDiscretizer(streamGraph, matchList.get(i), first);
+ }
+ }
+ }
+ }
+
+ private static void replaceDiscretizer(StreamGraph streamGraph, Integer toReplaceID,
+ Integer replaceWithID) {
+ // Convert to array to create a copy
+ List<StreamEdge> outEdges = new ArrayList<StreamEdge>(streamGraph.getVertex(toReplaceID)
+ .getOutEdges());
+
+ int numOutputs = outEdges.size();
+
+ // Reconnect outputs
+ for (int i = 0; i < numOutputs; i++) {
+ StreamEdge outEdge = outEdges.get(i);
+
+ streamGraph.addEdge(replaceWithID, outEdge.getTargetID(), outEdge.getPartitioner(), 0,
+ new ArrayList<String>());
+ }
+
+ // Remove the other discretizer
+ streamGraph.removeVertex(streamGraph.getVertex(toReplaceID));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/ChainableInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/ChainableInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/ChainableInvokable.java
deleted file mode 100644
index 4e09a98..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/ChainableInvokable.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
-import org.apache.flink.util.Collector;
-
-public abstract class ChainableInvokable<IN, OUT> extends StreamInvokable<IN, OUT> implements
- Collector<IN> {
-
- private static final long serialVersionUID = 1L;
- private boolean copyInput = true;
-
- public ChainableInvokable(Function userFunction) {
- super(userFunction);
- setChainingStrategy(ChainingStrategy.ALWAYS);
- }
-
- public void setup(Collector<OUT> collector, StreamRecordSerializer<IN> inSerializer) {
- this.collector = collector;
- this.inSerializer = inSerializer;
- this.objectSerializer = inSerializer.getObjectSerializer();
- }
-
- public ChainableInvokable<IN, OUT> withoutInputCopy() {
- copyInput = false;
- return this;
- }
-
- protected IN copyInput(IN input) {
- return copyInput ? copy(input) : input;
- }
-
- @Override
- public void collect(IN record) {
- if (isRunning) {
- nextObject = copyInput(record);
- callUserFunctionAndLogException();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
deleted file mode 100644
index 29d2ed2..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable;
-
-import org.apache.flink.streaming.api.function.sink.SinkFunction;
-
-public class SinkInvokable<IN> extends ChainableInvokable<IN, IN> {
- private static final long serialVersionUID = 1L;
-
- private SinkFunction<IN> sinkFunction;
-
- public SinkInvokable(SinkFunction<IN> sinkFunction) {
- super(sinkFunction);
- this.sinkFunction = sinkFunction;
- }
-
- @Override
- public void invoke() throws Exception {
- while (isRunning && readNext() != null) {
- callUserFunctionAndLogException();
- }
- }
-
- @Override
- protected void callUserFunction() throws Exception {
- sinkFunction.invoke(nextObject);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
deleted file mode 100644
index c3f25a0..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable;
-
-import java.io.Serializable;
-
-import org.apache.flink.streaming.api.function.source.SourceFunction;
-
-public class SourceInvokable<OUT> extends StreamInvokable<OUT, OUT> implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- private SourceFunction<OUT> sourceFunction;
-
- public SourceInvokable(SourceFunction<OUT> sourceFunction) {
- super(sourceFunction);
- this.sourceFunction = sourceFunction;
- }
-
- @Override
- public void invoke() {
- callUserFunctionAndLogException();
- }
-
- @Override
- protected void callUserFunction() throws Exception {
- sourceFunction.run(collector);
- }
-
- @Override
- public void cancel() {
- super.cancel();
- sourceFunction.cancel();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
deleted file mode 100644
index 6281de3..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.functions.util.FunctionUtils;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
-import org.apache.flink.streaming.api.streamvertex.StreamTaskContext;
-import org.apache.flink.streaming.io.IndexedReaderIterator;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * The StreamInvokable represents the base class for all invokables in the
- * streaming topology.
- *
- * @param <OUT>
- * The output type of the invokable
- */
-public abstract class StreamInvokable<IN, OUT> implements Serializable {
-
- private static final long serialVersionUID = 1L;
- private static final Logger LOG = LoggerFactory.getLogger(StreamInvokable.class);
-
- protected StreamTaskContext<OUT> taskContext;
-
- protected ExecutionConfig executionConfig = null;
-
- protected IndexedReaderIterator<StreamRecord<IN>> recordIterator;
- protected StreamRecordSerializer<IN> inSerializer;
- protected TypeSerializer<IN> objectSerializer;
- protected StreamRecord<IN> nextRecord;
- protected IN nextObject;
- protected boolean isMutable;
-
- public Collector<OUT> collector;
- protected Function userFunction;
- protected volatile boolean isRunning;
-
- private ChainingStrategy chainingStrategy = ChainingStrategy.HEAD;
-
- public StreamInvokable(Function userFunction) {
- this.userFunction = userFunction;
- }
-
- /**
- * Initializes the {@link StreamInvokable} for input and output handling
- *
- * @param taskContext
- * StreamTaskContext representing the vertex
- */
- public void setup(StreamTaskContext<OUT> taskContext) {
- this.collector = taskContext.getOutputCollector();
- this.recordIterator = taskContext.getIndexedInput(0);
- this.inSerializer = taskContext.getInputSerializer(0);
- if (this.inSerializer != null) {
- this.nextRecord = inSerializer.createInstance();
- this.objectSerializer = inSerializer.getObjectSerializer();
- }
- this.taskContext = taskContext;
- this.executionConfig = taskContext.getExecutionConfig();
- }
-
- /**
- * Method that will be called when the operator starts, should encode the
- * processing logic
- */
- public abstract void invoke() throws Exception;
-
- /*
- * Reads the next record from the reader iterator and stores it in the
- * nextRecord variable
- */
- protected StreamRecord<IN> readNext() throws IOException {
- this.nextRecord = inSerializer.createInstance();
- try {
- nextRecord = recordIterator.next(nextRecord);
- try {
- nextObject = nextRecord.getObject();
- } catch (NullPointerException e) {
- // end of stream
- }
- return nextRecord;
- } catch (IOException e) {
- if (isRunning) {
- throw new RuntimeException("Could not read next record due to: "
- + StringUtils.stringifyException(e));
- } else {
- // Task already cancelled do nothing
- return null;
- }
- } catch (IllegalStateException e) {
- if (isRunning) {
- throw new RuntimeException("Could not read next record due to: "
- + StringUtils.stringifyException(e));
- } else {
- // Task already cancelled do nothing
- return null;
- }
- }
- }
-
- /**
- * The call of the user implemented function should be implemented here
- */
- protected void callUserFunction() throws Exception {
- }
-
- /**
- * Method for logging exceptions thrown during the user function call
- */
- protected void callUserFunctionAndLogException() {
- try {
- callUserFunction();
- } catch (Exception e) {
- if (LOG.isErrorEnabled()) {
- LOG.error("Calling user function failed due to: {}",
- StringUtils.stringifyException(e));
- }
- throw new RuntimeException(e);
- }
- }
-
- /**
- * Open method to be used if the user defined function extends the
- * RichFunction class
- *
- * @param parameters
- * The configuration parameters for the operator
- */
- public void open(Configuration parameters) throws Exception {
- isRunning = true;
- FunctionUtils.openFunction(userFunction, parameters);
- }
-
- /**
- * Close method to be used if the user defined function extends the
- * RichFunction class
- *
- */
- public void close() {
- isRunning = false;
- collector.close();
- try {
- FunctionUtils.closeFunction(userFunction);
- } catch (Exception e) {
- throw new RuntimeException("Error when closing the function: " + e.getMessage());
- }
- }
-
- public void cancel() {
- isRunning = false;
- }
-
- public void setRuntimeContext(RuntimeContext t) {
- FunctionUtils.setFunctionRuntimeContext(userFunction, t);
- }
-
- protected IN copy(IN record) {
- return objectSerializer.copy(record);
- }
-
- public void setChainingStrategy(ChainingStrategy strategy) {
- if (strategy == ChainingStrategy.ALWAYS) {
- if (!(this instanceof ChainableInvokable)) {
- throw new RuntimeException(
- "Invokable needs to extend ChainableInvokable to be chained");
- }
- }
- this.chainingStrategy = strategy;
- }
-
- public ChainingStrategy getChainingStrategy() {
- return chainingStrategy;
- }
-
- public static enum ChainingStrategy {
- ALWAYS, NEVER, HEAD;
- }
-
- public Function getUserFunction() {
- return userFunction;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java
deleted file mode 100644
index 8bb546c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import org.apache.flink.streaming.api.invokable.ChainableInvokable;
-
-public class CounterInvokable<IN> extends ChainableInvokable<IN, Long> {
- private static final long serialVersionUID = 1L;
-
- Long count = 0L;
-
- public CounterInvokable() {
- super(null);
- }
-
- @Override
- public void invoke() throws Exception {
- while (isRunning && readNext() != null) {
- collector.collect(++count);
- }
- }
-
- @Override
- public void collect(IN record) {
- if (isRunning) {
- collector.collect(++count);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
deleted file mode 100644
index 00d432b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.streaming.api.invokable.ChainableInvokable;
-
-public class FilterInvokable<IN> extends ChainableInvokable<IN, IN> {
-
- private static final long serialVersionUID = 1L;
-
- FilterFunction<IN> filterFunction;
- private boolean collect;
-
- public FilterInvokable(FilterFunction<IN> filterFunction) {
- super(filterFunction);
- this.filterFunction = filterFunction;
- }
-
- @Override
- public void invoke() throws Exception {
- while (isRunning && readNext() != null) {
- callUserFunctionAndLogException();
- }
- }
-
- @Override
- protected void callUserFunction() throws Exception {
- collect = filterFunction.filter(nextObject);
- if (collect) {
- collector.collect(nextObject);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
deleted file mode 100644
index dfead14..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.streaming.api.invokable.ChainableInvokable;
-
-public class FlatMapInvokable<IN, OUT> extends ChainableInvokable<IN, OUT> {
- private static final long serialVersionUID = 1L;
-
- private FlatMapFunction<IN, OUT> flatMapper;
-
- public FlatMapInvokable(FlatMapFunction<IN, OUT> flatMapper) {
- super(flatMapper);
- this.flatMapper = flatMapper;
- }
-
- @Override
- public void invoke() throws Exception {
- while (isRunning && readNext() != null) {
- callUserFunctionAndLogException();
- }
- }
-
- @Override
- protected void callUserFunction() throws Exception {
- flatMapper.flatMap(nextObject, collector);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokable.java
deleted file mode 100644
index 4a0f4f5..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokable.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.functions.KeySelector;
-
-public class GroupedFoldInvokable<IN, OUT> extends StreamFoldInvokable<IN, OUT> {
- private static final long serialVersionUID = 1L;
-
- private KeySelector<IN, ?> keySelector;
- private Map<Object, OUT> values;
- private OUT initialValue;
-
- public GroupedFoldInvokable(FoldFunction<IN, OUT> folder, KeySelector<IN, ?> keySelector,
- OUT initialValue, TypeInformation<OUT> outTypeInformation) {
- super(folder, initialValue, outTypeInformation);
- this.keySelector = keySelector;
- this.initialValue = initialValue;
- values = new HashMap<Object, OUT>();
- }
-
- @Override
- protected void callUserFunction() throws Exception {
- Object key = nextRecord.getKey(keySelector);
- OUT accumulator = values.get(key);
- if (accumulator != null) {
- OUT folded = folder.fold(outTypeSerializer.copy(accumulator), nextObject);
- values.put(key, folded);
- collector.collect(folded);
- } else {
- OUT first = folder.fold(outTypeSerializer.copy(initialValue), nextObject);
- values.put(key, first);
- collector.collect(first);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java
deleted file mode 100644
index 72f52ea..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.functions.KeySelector;
-
-public class GroupedReduceInvokable<IN> extends StreamReduceInvokable<IN> {
- private static final long serialVersionUID = 1L;
-
- private KeySelector<IN, ?> keySelector;
- private Map<Object, IN> values;
-
- public GroupedReduceInvokable(ReduceFunction<IN> reducer, KeySelector<IN, ?> keySelector) {
- super(reducer);
- this.keySelector = keySelector;
- values = new HashMap<Object, IN>();
- }
-
- @Override
- protected void callUserFunction() throws Exception {
- Object key = keySelector.getKey(nextObject);
- IN currentValue = values.get(key);
- if (currentValue != null) {
- IN reduced = reducer.reduce(copy(currentValue), nextObject);
- values.put(key, reduced);
- collector.collect(reduced);
- } else {
- values.put(key, nextObject);
- collector.collect(nextObject);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
deleted file mode 100644
index 53cb825..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.streaming.api.invokable.ChainableInvokable;
-
-public class MapInvokable<IN, OUT> extends ChainableInvokable<IN, OUT> {
- private static final long serialVersionUID = 1L;
-
- private MapFunction<IN, OUT> mapper;
-
- public MapInvokable(MapFunction<IN, OUT> mapper) {
- super(mapper);
- this.mapper = mapper;
- }
-
- @Override
- public void invoke() throws Exception {
- while (isRunning && readNext() != null) {
- callUserFunctionAndLogException();
- }
- }
-
- @Override
- protected void callUserFunction() throws Exception {
- collector.collect(mapper.map(nextObject));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
deleted file mode 100644
index bc58188..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.invokable.ChainableInvokable;
-
-public class ProjectInvokable<IN, OUT extends Tuple> extends ChainableInvokable<IN, OUT> {
- private static final long serialVersionUID = 1L;
-
- transient OUT outTuple;
- TypeSerializer<OUT> outTypeSerializer;
- TypeInformation<OUT> outTypeInformation;
- int[] fields;
- int numFields;
-
- public ProjectInvokable(int[] fields, TypeInformation<OUT> outTypeInformation) {
- super(null);
- this.fields = fields;
- this.numFields = this.fields.length;
- this.outTypeInformation = outTypeInformation;
- }
-
- @Override
- public void invoke() throws Exception {
- while (isRunning && readNext() != null) {
- callUserFunctionAndLogException();
- }
- }
-
- @Override
- protected void callUserFunction() throws Exception {
- for (int i = 0; i < this.numFields; i++) {
- outTuple.setField(((Tuple)nextObject).getField(fields[i]), i);
- }
- collector.collect(outTuple);
- }
-
- @Override
- public void open(Configuration config) throws Exception {
- super.open(config);
- this.outTypeSerializer = outTypeInformation.createSerializer(executionConfig);
- outTuple = outTypeSerializer.createInstance();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldInvokable.java
deleted file mode 100644
index 1353c01..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldInvokable.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.streaming.api.invokable.ChainableInvokable;
-
-public class StreamFoldInvokable<IN, OUT> extends ChainableInvokable<IN, OUT> {
- private static final long serialVersionUID = 1L;
-
- protected FoldFunction<IN, OUT> folder;
- private OUT accumulator;
- protected TypeSerializer<OUT> outTypeSerializer;
-
- public StreamFoldInvokable(FoldFunction<IN, OUT> folder, OUT initialValue,
- TypeInformation<OUT> outTypeInformation) {
- super(folder);
- this.folder = folder;
- this.accumulator = initialValue;
- this.outTypeSerializer = outTypeInformation.createSerializer(executionConfig);
- }
-
- @Override
- public void invoke() throws Exception {
- while (isRunning && readNext() != null) {
- callUserFunctionAndLogException();
- }
- }
-
- @Override
- protected void callUserFunction() throws Exception {
-
- accumulator = folder.fold(outTypeSerializer.copy(accumulator), nextObject);
- collector.collect(accumulator);
-
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
deleted file mode 100644
index f0f378d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.streaming.api.invokable.ChainableInvokable;
-
-public class StreamReduceInvokable<IN> extends ChainableInvokable<IN, IN> {
- private static final long serialVersionUID = 1L;
-
- protected ReduceFunction<IN> reducer;
- private IN currentValue;
-
- public StreamReduceInvokable(ReduceFunction<IN> reducer) {
- super(reducer);
- this.reducer = reducer;
- currentValue = null;
- }
-
- @Override
- public void invoke() throws Exception {
- while (isRunning && readNext() != null) {
- callUserFunctionAndLogException();
- }
- }
-
- @Override
- protected void callUserFunction() throws Exception {
-
- if (currentValue != null) {
- currentValue = reducer.reduce(copy(currentValue), nextObject);
- } else {
- currentValue = nextObject;
-
- }
- collector.collect(currentValue);
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoFlatMapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoFlatMapInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoFlatMapInvokable.java
deleted file mode 100644
index 4cbaebb..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoFlatMapInvokable.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.co;
-
-import org.apache.flink.streaming.api.function.co.CoFlatMapFunction;
-
-public class CoFlatMapInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT> {
- private static final long serialVersionUID = 1L;
-
- private CoFlatMapFunction<IN1, IN2, OUT> flatMapper;
-
- public CoFlatMapInvokable(CoFlatMapFunction<IN1, IN2, OUT> flatMapper) {
- super(flatMapper);
- this.flatMapper = flatMapper;
- }
-
- @Override
- public void handleStream1() throws Exception {
- callUserFunctionAndLogException1();
- }
-
- @Override
- public void handleStream2() throws Exception {
- callUserFunctionAndLogException2();
- }
-
- @Override
- protected void callUserFunction1() throws Exception {
- flatMapper.flatMap1(reuse1.getObject(), collector);
-
- }
-
- @Override
- protected void callUserFunction2() throws Exception {
- flatMapper.flatMap2(reuse2.getObject(), collector);
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedReduceInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedReduceInvokable.java
deleted file mode 100644
index 4907ac5..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedReduceInvokable.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.co;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.function.co.CoReduceFunction;
-
-public class CoGroupedReduceInvokable<IN1, IN2, OUT> extends CoReduceInvokable<IN1, IN2, OUT> {
- private static final long serialVersionUID = 1L;
-
- protected KeySelector<IN1, ?> keySelector1;
- protected KeySelector<IN2, ?> keySelector2;
- private Map<Object, IN1> values1;
- private Map<Object, IN2> values2;
- IN1 reduced1;
- IN2 reduced2;
-
- public CoGroupedReduceInvokable(CoReduceFunction<IN1, IN2, OUT> coReducer,
- KeySelector<IN1, ?> keySelector1, KeySelector<IN2, ?> keySelector2) {
- super(coReducer);
- this.coReducer = coReducer;
- this.keySelector1 = keySelector1;
- this.keySelector2 = keySelector2;
- values1 = new HashMap<Object, IN1>();
- values2 = new HashMap<Object, IN2>();
- }
-
- @Override
- public void handleStream1() throws Exception {
- Object key = reuse1.getKey(keySelector1);
- currentValue1 = values1.get(key);
- nextValue1 = reuse1.getObject();
- if (currentValue1 != null) {
- callUserFunctionAndLogException1();
- values1.put(key, reduced1);
- collector.collect(coReducer.map1(reduced1));
- } else {
- values1.put(key, nextValue1);
- collector.collect(coReducer.map1(nextValue1));
- }
- }
-
- @Override
- public void handleStream2() throws Exception {
- Object key = reuse2.getKey(keySelector2);
- currentValue2 = values2.get(key);
- nextValue2 = reuse2.getObject();
- if (currentValue2 != null) {
- callUserFunctionAndLogException2();
- values2.put(key, reduced2);
- collector.collect(coReducer.map2(reduced2));
- } else {
- values2.put(key, nextValue2);
- collector.collect(coReducer.map2(nextValue2));
- }
- }
-
- @Override
- protected void callUserFunction1() throws Exception {
- reduced1 = coReducer.reduce1(currentValue1, nextValue1);
-
- }
-
- @Override
- protected void callUserFunction2() throws Exception {
- reduced2 = coReducer.reduce2(currentValue2, nextValue2);
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
deleted file mode 100644
index f727a32..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.co;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
-import org.apache.flink.streaming.api.streamvertex.StreamTaskContext;
-import org.apache.flink.streaming.io.CoReaderIterator;
-import org.apache.flink.util.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<IN1, OUT> {
-
- public CoInvokable(Function userFunction) {
- super(userFunction);
- }
-
- private static final long serialVersionUID = 1L;
- private static final Logger LOG = LoggerFactory.getLogger(CoInvokable.class);
-
- protected CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> recordIterator;
- protected StreamRecord<IN1> reuse1;
- protected StreamRecord<IN2> reuse2;
- protected StreamRecordSerializer<IN1> srSerializer1;
- protected StreamRecordSerializer<IN2> srSerializer2;
- protected TypeSerializer<IN1> serializer1;
- protected TypeSerializer<IN2> serializer2;
-
- @Override
- public void setup(StreamTaskContext<OUT> taskContext) {
- this.collector = taskContext.getOutputCollector();
-
- this.recordIterator = taskContext.getCoReader();
-
- this.srSerializer1 = taskContext.getInputSerializer(0);
- this.srSerializer2 = taskContext.getInputSerializer(1);
-
- this.reuse1 = srSerializer1.createInstance();
- this.reuse2 = srSerializer2.createInstance();
-
- this.serializer1 = srSerializer1.getObjectSerializer();
- this.serializer2 = srSerializer2.getObjectSerializer();
- }
-
- protected void resetReuseAll() {
- this.reuse1 = srSerializer1.createInstance();
- this.reuse2 = srSerializer2.createInstance();
- }
-
- protected void resetReuse1() {
- this.reuse1 = srSerializer1.createInstance();
- }
-
- protected void resetReuse2() {
- this.reuse2 = srSerializer2.createInstance();
- }
-
- @Override
- public void invoke() throws Exception {
- while (isRunning) {
- int next;
- try {
- next = recordIterator.next(reuse1, reuse2);
- } catch (IOException e) {
- if (isRunning) {
- throw new RuntimeException("Could not read next record.", e);
- } else {
- // Task already cancelled do nothing
- next = 0;
- }
- } catch (IllegalStateException e) {
- if (isRunning) {
- throw new RuntimeException("Could not read next record.", e);
- } else {
- // Task already cancelled do nothing
- next = 0;
- }
- }
-
- if (next == 0) {
- break;
- } else if (next == 1) {
- initialize1();
- handleStream1();
- resetReuse1();
- } else {
- initialize2();
- handleStream2();
- resetReuse2();
- }
- }
- }
-
- protected abstract void handleStream1() throws Exception;
-
- protected abstract void handleStream2() throws Exception;
-
- protected abstract void callUserFunction1() throws Exception;
-
- protected abstract void callUserFunction2() throws Exception;
-
- protected void initialize1() {
-
- };
-
- protected void initialize2() {
-
- };
-
- protected void callUserFunctionAndLogException1() {
- try {
- callUserFunction1();
- } catch (Exception e) {
- if (LOG.isErrorEnabled()) {
- LOG.error("Calling user function failed due to: {}",
- StringUtils.stringifyException(e));
- }
- throw new RuntimeException(e);
- }
- }
-
- protected void callUserFunctionAndLogException2() {
- try {
- callUserFunction2();
- } catch (Exception e) {
- if (LOG.isErrorEnabled()) {
- LOG.error("Calling user function failed due to: {}",
- StringUtils.stringifyException(e));
- }
- throw new RuntimeException(e);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
deleted file mode 100644
index 5499dba..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.co;
-
-import org.apache.flink.streaming.api.function.co.CoMapFunction;
-
-public class CoMapInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT> {
- private static final long serialVersionUID = 1L;
-
- private CoMapFunction<IN1, IN2, OUT> mapper;
-
- public CoMapInvokable(CoMapFunction<IN1, IN2, OUT> mapper) {
- super(mapper);
- this.mapper = mapper;
- }
-
- @Override
- public void handleStream1() throws Exception {
- callUserFunctionAndLogException1();
- }
-
- @Override
- public void handleStream2() throws Exception {
- callUserFunctionAndLogException2();
- }
-
- @Override
- protected void callUserFunction1() throws Exception {
- collector.collect(mapper.map1(reuse1.getObject()));
-
- }
-
- @Override
- protected void callUserFunction2() throws Exception {
- collector.collect(mapper.map2(reuse2.getObject()));
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoReduceInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoReduceInvokable.java
deleted file mode 100644
index 057dfce..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoReduceInvokable.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.co;
-
-import org.apache.flink.streaming.api.function.co.CoReduceFunction;
-
-public class CoReduceInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT> {
- private static final long serialVersionUID = 1L;
-
- protected CoReduceFunction<IN1, IN2, OUT> coReducer;
- protected IN1 currentValue1 = null;
- protected IN2 currentValue2 = null;
- protected IN1 nextValue1 = null;
- protected IN2 nextValue2 = null;
-
- public CoReduceInvokable(CoReduceFunction<IN1, IN2, OUT> coReducer) {
- super(coReducer);
- this.coReducer = coReducer;
- currentValue1 = null;
- currentValue2 = null;
- }
-
- @Override
- public void handleStream1() throws Exception {
- nextValue1 = reuse1.getObject();
- callUserFunctionAndLogException1();
- }
-
- @Override
- public void handleStream2() throws Exception {
- nextValue2 = reuse2.getObject();
- callUserFunctionAndLogException2();
- }
-
- @Override
- protected void callUserFunction1() throws Exception {
- if (currentValue1 != null) {
- currentValue1 = coReducer.reduce1(currentValue1, nextValue1);
- } else {
- currentValue1 = nextValue1;
- }
- collector.collect(coReducer.map1(currentValue1));
- }
-
- @Override
- protected void callUserFunction2() throws Exception {
- if (currentValue2 != null) {
- currentValue2 = coReducer.reduce2(currentValue2, nextValue2);
- } else {
- currentValue2 = nextValue2;
- }
- collector.collect(coReducer.map2(currentValue2));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java
deleted file mode 100644
index 93f597f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.co;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.math.util.MathUtils;
-import org.apache.flink.streaming.api.function.co.CoWindowFunction;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-import org.apache.flink.streaming.state.CircularFifoList;
-
-public class CoWindowInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT> {
- private static final long serialVersionUID = 1L;
-
- protected CoWindowFunction<IN1, IN2, OUT> coWindowFunction;
- protected long windowSize;
- protected long slideSize;
- protected CircularFifoList<StreamRecord<IN1>> circularList1;
- protected CircularFifoList<StreamRecord<IN2>> circularList2;
- protected TimestampWrapper<IN1> timeStamp1;
- protected TimestampWrapper<IN2> timeStamp2;
-
- protected StreamWindow window;
-
- protected long startTime;
- protected long nextRecordTime;
-
- public CoWindowInvokable(CoWindowFunction<IN1, IN2, OUT> coWindowFunction, long windowSize,
- long slideInterval, TimestampWrapper<IN1> timeStamp1, TimestampWrapper<IN2> timeStamp2) {
- super(coWindowFunction);
- this.coWindowFunction = coWindowFunction;
- this.windowSize = windowSize;
- this.slideSize = slideInterval;
- this.circularList1 = new CircularFifoList<StreamRecord<IN1>>();
- this.circularList2 = new CircularFifoList<StreamRecord<IN2>>();
- this.timeStamp1 = timeStamp1;
- this.timeStamp2 = timeStamp2;
- this.startTime = timeStamp1.getStartTime();
-
- this.window = new StreamWindow();
- }
-
- @Override
- protected void handleStream1() throws Exception {
- window.addToBuffer1(reuse1.getObject());
- }
-
- @Override
- protected void handleStream2() throws Exception {
- window.addToBuffer2(reuse2.getObject());
- }
-
- @Override
- protected void callUserFunction() throws Exception {
-
- List<IN1> first = new ArrayList<IN1>();
- List<IN2> second = new ArrayList<IN2>();
-
- for (IN1 element : window.circularList1.getElements()) {
- first.add(serializer1.copy(element));
- }
- for (IN2 element : window.circularList2.getElements()) {
- second.add(serializer2.copy(element));
- }
-
- if (!window.circularList1.isEmpty() || !window.circularList2.isEmpty()) {
- coWindowFunction.coWindow(first, second, collector);
- }
- }
-
- protected class StreamWindow implements Serializable {
- private static final long serialVersionUID = 1L;
-
- protected int granularity;
- protected int batchPerSlide;
- protected long numberOfBatches;
-
- protected long minibatchCounter;
-
- protected CircularFifoList<IN1> circularList1;
- protected CircularFifoList<IN2> circularList2;
-
- public StreamWindow() {
- this.granularity = (int) MathUtils.gcd(windowSize, slideSize);
- this.batchPerSlide = (int) (slideSize / granularity);
- this.numberOfBatches = windowSize / granularity;
- this.circularList1 = new CircularFifoList<IN1>();
- this.circularList2 = new CircularFifoList<IN2>();
- this.minibatchCounter = 0;
- }
-
- public void addToBuffer1(IN1 nextValue) throws Exception {
- checkWindowEnd(timeStamp1.getTimestamp(nextValue));
- if (minibatchCounter >= 0) {
- circularList1.add(nextValue);
- }
- }
-
- public void addToBuffer2(IN2 nextValue) throws Exception {
- checkWindowEnd(timeStamp2.getTimestamp(nextValue));
- if (minibatchCounter >= 0) {
- circularList2.add(nextValue);
- }
- }
-
- protected synchronized void checkWindowEnd(long timeStamp) {
- nextRecordTime = timeStamp;
-
- while (miniBatchEnd()) {
- circularList1.newSlide();
- circularList2.newSlide();
- minibatchCounter++;
- if (windowEnd()) {
- callUserFunctionAndLogException();
- circularList1.shiftWindow(batchPerSlide);
- circularList2.shiftWindow(batchPerSlide);
- }
- }
- }
-
- protected boolean miniBatchEnd() {
- if (nextRecordTime < startTime + granularity) {
- return false;
- } else {
- startTime += granularity;
- return true;
- }
- }
-
- public boolean windowEnd() {
- if (minibatchCounter == numberOfBatches) {
- minibatchCounter -= batchPerSlide;
- return true;
- }
- return false;
- }
-
- public void reduceLastBatch() {
- if (!miniBatchEnd()) {
- callUserFunctionAndLogException();
- }
- }
-
- public Iterable<IN1> getIterable1() {
- return circularList1.getIterable();
- }
-
- public Iterable<IN2> getIterable2() {
- return circularList2.getIterable();
- }
-
- @Override
- public String toString() {
- return circularList1.toString();
- }
-
- }
-
- @Override
- public void close() {
- if (!window.miniBatchEnd()) {
- callUserFunctionAndLogException();
- }
- super.close();
- }
-
- @Override
- protected void callUserFunction1() throws Exception {
- }
-
- @Override
- protected void callUserFunction2() throws Exception {
- }
-
- public void setSlideSize(long slideSize) {
- this.slideSize = slideSize;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/EmptyWindowFilter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/EmptyWindowFilter.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/EmptyWindowFilter.java
deleted file mode 100644
index 0f2ee31..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/EmptyWindowFilter.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.windowing;
-
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-
-public class EmptyWindowFilter<OUT> implements FilterFunction<StreamWindow<OUT>> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public boolean filter(StreamWindow<OUT> value) throws Exception {
- return !value.isEmpty();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedActiveDiscretizer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedActiveDiscretizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedActiveDiscretizer.java
deleted file mode 100644
index 35f466f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedActiveDiscretizer.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.windowing;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.windowing.policy.CentralActiveTrigger;
-import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
-
-public class GroupedActiveDiscretizer<IN> extends GroupedStreamDiscretizer<IN> {
-
- private static final long serialVersionUID = -3469545957144404137L;
-
- private volatile IN last;
- private Thread centralThread;
-
- public GroupedActiveDiscretizer(KeySelector<IN, ?> keySelector,
- CentralActiveTrigger<IN> triggerPolicy, CloneableEvictionPolicy<IN> evictionPolicy) {
- super(keySelector, triggerPolicy, evictionPolicy);
- }
-
- @Override
- protected StreamDiscretizer<IN> makeNewGroup(Object key) throws Exception {
-
- StreamDiscretizer<IN> groupDiscretizer = new StreamDiscretizer<IN>(triggerPolicy.clone(),
- evictionPolicy.clone());
-
- groupDiscretizer.collector = taskContext.getOutputCollector();
- // We omit the groupDiscretizer.open(...) call here to avoid starting
- // new active threads
- return groupDiscretizer;
- }
-
- @Override
- public void invoke() throws Exception {
-
- while (isRunning && readNext() != null) {
- last = copy(nextObject);
- Object key = keySelector.getKey(nextObject);
-
- synchronized (groupedDiscretizers) {
- StreamDiscretizer<IN> groupDiscretizer = groupedDiscretizers.get(key);
-
- if (groupDiscretizer == null) {
- groupDiscretizer = makeNewGroup(key);
- groupedDiscretizers.put(key, groupDiscretizer);
- }
-
- groupDiscretizer.processRealElement(nextObject);
- }
-
- }
-
- for (StreamDiscretizer<IN> group : groupedDiscretizers.values()) {
- group.emitWindow();
- }
-
- }
-
- @Override
- public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
- super.open(parameters);
- centralThread = new Thread(new CentralCheck());
- centralThread.start();
- }
-
- private class CentralCheck implements Runnable {
-
- @Override
- public void run() {
- while (isRunning) {
- // wait for the specified granularity
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- // ignore it...
- }
-
- try {
- if (last != null) {
- synchronized (groupedDiscretizers) {
- for (StreamDiscretizer<IN> group : groupedDiscretizers.values()) {
-
- CentralActiveTrigger<IN> groupTrigger = (CentralActiveTrigger<IN>) group.triggerPolicy;
- Object[] fakes = groupTrigger.notifyOnLastGlobalElement(last);
- if (fakes != null) {
- for (Object fake : fakes) {
- group.triggerOnFakeElement(fake);
- }
- }
- }
- }
-
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
-
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java
deleted file mode 100644
index f14a6ae..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.windowing;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy;
-import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer;
-
-/**
- * This invokable represents the grouped discretization step of a window
- * transformation. The user supplied eviction and trigger policies are applied
- * on a per group basis to create the {@link StreamWindow} that will be further
- * transformed in the next stages. </p> To allow pre-aggregations supply an
- * appropriate {@link WindowBuffer}
- */
-public class GroupedStreamDiscretizer<IN> extends StreamDiscretizer<IN> {
-
- private static final long serialVersionUID = -3469545957144404137L;
-
- protected KeySelector<IN, ?> keySelector;
- protected Configuration parameters;
- protected CloneableTriggerPolicy<IN> triggerPolicy;
- protected CloneableEvictionPolicy<IN> evictionPolicy;
-
- protected Map<Object, StreamDiscretizer<IN>> groupedDiscretizers;
-
- public GroupedStreamDiscretizer(KeySelector<IN, ?> keySelector,
- CloneableTriggerPolicy<IN> triggerPolicy, CloneableEvictionPolicy<IN> evictionPolicy) {
-
- super(triggerPolicy, evictionPolicy);
-
- this.keySelector = keySelector;
-
- this.triggerPolicy = triggerPolicy;
- this.evictionPolicy = evictionPolicy;
-
- this.groupedDiscretizers = new HashMap<Object, StreamDiscretizer<IN>>();
- }
-
- @Override
- public void invoke() throws Exception {
-
- while (isRunning && readNext() != null) {
-
- Object key = keySelector.getKey(nextObject);
-
- StreamDiscretizer<IN> groupDiscretizer = groupedDiscretizers.get(key);
-
- if (groupDiscretizer == null) {
- groupDiscretizer = makeNewGroup(key);
- groupedDiscretizers.put(key, groupDiscretizer);
- }
-
- groupDiscretizer.processRealElement(nextObject);
- }
-
- for (StreamDiscretizer<IN> group : groupedDiscretizers.values()) {
- group.emitWindow();
- }
-
- }
-
- /**
- * This method creates a new group. The method gets called in case an
- * element arrives which has a key which was not seen before. The method
- * created a nested {@link StreamDiscretizer} and therefore created clones
- * of all distributed trigger and eviction policies.
- *
- * @param key
- * The key of the new group.
- */
- protected StreamDiscretizer<IN> makeNewGroup(Object key) throws Exception {
-
- StreamDiscretizer<IN> groupDiscretizer = new StreamDiscretizer<IN>(triggerPolicy.clone(),
- evictionPolicy.clone());
-
- groupDiscretizer.collector = taskContext.getOutputCollector();
- groupDiscretizer.open(this.parameters);
-
- return groupDiscretizer;
- }
-
- @Override
- public boolean equals(Object other) {
- if (other == null || !(other instanceof GroupedStreamDiscretizer)) {
- return false;
- } else {
- try {
- @SuppressWarnings("unchecked")
- GroupedStreamDiscretizer<IN> otherDiscretizer = (GroupedStreamDiscretizer<IN>) other;
-
- return triggerPolicy.equals(otherDiscretizer.triggerPolicy)
- && evictionPolicy.equals(otherDiscretizer.evictionPolicy)
- && keySelector.equals(otherDiscretizer.keySelector);
-
- } catch (ClassCastException e) {
- return false;
- }
- }
- }
-
- @Override
- public String toString() {
- return "GroupedDiscretizer(Key: " + keySelector.getClass().getSimpleName() + ", Trigger: "
- + triggerPolicy.toString() + ", Eviction: " + evictionPolicy.toString() + ")";
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedWindowBufferInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedWindowBufferInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedWindowBufferInvokable.java
deleted file mode 100644
index 2c3bd75..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedWindowBufferInvokable.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.windowing;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.WindowEvent;
-import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer;
-
-/**
- * This invokable flattens the results of the window transformations by
- * outputing the elements of the {@link StreamWindow} one-by-one
- */
-public class GroupedWindowBufferInvokable<T> extends WindowBufferInvokable<T> {
-
- private static final long serialVersionUID = 1L;
- private Map<Object, WindowBuffer<T>> windowMap = new HashMap<Object, WindowBuffer<T>>();
- private KeySelector<T, ?> keySelector;
-
- public GroupedWindowBufferInvokable(WindowBuffer<T> buffer, KeySelector<T, ?> keySelector) {
- super(buffer);
- this.keySelector = keySelector;
- }
-
- @Override
- public void invoke() throws Exception {
- while (isRunning && readNext() != null) {
- callUserFunctionAndLogException();
- }
- }
-
- @Override
- protected void callUserFunction() throws Exception {
- if (nextObject.getElement() != null) {
- Object key = keySelector.getKey(nextObject.getElement());
- WindowBuffer<T> currentWindow = windowMap.get(key);
-
- if (currentWindow == null) {
- currentWindow = buffer.clone();
- windowMap.put(key, currentWindow);
- }
-
- handleWindowEvent(nextObject, currentWindow);
- }
- }
-
- @Override
- public void collect(WindowEvent<T> record) {
- if (isRunning) {
- nextObject = record;
- callUserFunctionAndLogException();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/ParallelGroupedMerge.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/ParallelGroupedMerge.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/ParallelGroupedMerge.java
deleted file mode 100644
index 737485f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/ParallelGroupedMerge.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.windowing;
-
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-
-/**
- * The version of the ParallelMerge CoFlatMap that does not reduce the incoming
- * elements only appends them to the current window. This is necessary for
- * grouped reduces.
- */
-public class ParallelGroupedMerge<OUT> extends ParallelMerge<OUT> {
-
- private static final long serialVersionUID = 1L;
-
- public ParallelGroupedMerge() {
- super(null);
- }
-
- @Override
- protected void updateCurrent(StreamWindow<OUT> current, StreamWindow<OUT> nextWindow)
- throws Exception {
- current.addAll(nextWindow);
- }
-
-}