You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/09 12:16:44 UTC
[08/10] flink git commit: [FLINK-2780] Remove Old Windowing Logic and
API
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromArray.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromArray.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromArray.java
new file mode 100644
index 0000000..b1c080e
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromArray.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.windowing.delta.extractor;
+
+import java.lang.reflect.Array;
+
+/**
+ * Extracts multiple fields from an array and puts them into a new array of the
+ * specified type.
+ *
+ * @param <OUT>
+ * The type of the output array. If out is set to String, the output
+ * of the extractor will be a String[]. If it is set to String[] the
+ * output will be String[][].
+ */
+public class FieldsFromArray<OUT> implements Extractor<Object, OUT[]> {
+
+ /**
+ * Auto-generated version id
+ */
+ private static final long serialVersionUID = 8075055384516397670L;
+ private int[] order;
+ private Class<OUT> clazz;
+
+ /**
+ * Extracts multiple fields from an array and puts them in the given order
+ * into a new array of the specified type.
+ *
+ * @param clazz
+ * the Class object representing the component type of the new
+ * array
+ * @param indexes
+ * The indexes of the fields to be extracted. Any order is
+ * possible, but not more than 255 fields due to limitations in
+ * {@link Array#newInstance(Class, int...)}.
+ */
+ public FieldsFromArray(Class<OUT> clazz, int... indexes) {
+ this.order = indexes;
+ this.clazz = clazz;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public OUT[] extract(Object in) {
+ OUT[] output = (OUT[]) Array.newInstance(clazz, order.length);
+ for (int i = 0; i < order.length; i++) {
+ output[i] = (OUT) Array.get(in, this.order[i]);
+ }
+ return output;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromTuple.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromTuple.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromTuple.java
new file mode 100644
index 0000000..fc7f3ab
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromTuple.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.windowing.delta.extractor;
+
+import org.apache.flink.api.java.tuple.Tuple;
+
+/**
+ * Extracts one or more fields of the type Double from a tuple and puts them
+ * into a new double[]
+ */
+public class FieldsFromTuple implements Extractor<Tuple, double[]> {
+
+ /**
+ * auto generated version id
+ */
+ private static final long serialVersionUID = -2554079091050273761L;
+ int[] indexes;
+
+ /**
+ * Extracts one or more fields of the the type Double from a tuple and puts
+ * them into a new double[] (in the specified order).
+ *
+ * @param indexes
+ * The indexes of the fields to be extracted.
+ */
+ public FieldsFromTuple(int... indexes) {
+ this.indexes = indexes;
+ }
+
+ @Override
+ public double[] extract(Tuple in) {
+ double[] out = new double[indexes.length];
+ for (int i = 0; i < indexes.length; i++) {
+ out[i] = (Double) in.getField(indexes[i]);
+ }
+ return out;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index cfa6d93..2ca82b1 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -575,8 +575,6 @@ public class StreamGraph extends StreamingPlan {
setJobName(jobGraphName);
- WindowingOptimizer.optimizeGraph(this);
-
StreamingJobGraphGenerator jobgraphGenerator = new StreamingJobGraphGenerator(this);
return jobgraphGenerator.createJobGraph(jobGraphName);
@@ -585,8 +583,6 @@ public class StreamGraph extends StreamingPlan {
@Override
public String getStreamingPlanAsJSON() {
- WindowingOptimizer.optimizeGraph(this);
-
try {
return new JSONGenerator(this).getJSON();
} catch (JSONException e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/WindowingOptimizer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/WindowingOptimizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/WindowingOptimizer.java
deleted file mode 100644
index cbd2a40..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/WindowingOptimizer.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.graph;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer;
-import org.apache.flink.streaming.api.operators.windowing.WindowFlattener;
-import org.apache.flink.streaming.api.operators.windowing.WindowMerger;
-
-public class WindowingOptimizer {
-
- public static void optimizeGraph(StreamGraph streamGraph) {
-
- // Share common discrtizers
- setDiscretizerReuse(streamGraph);
-
- // Remove unnecessary merges before flatten operators
- removeMergeBeforeFlatten(streamGraph);
- }
-
- @SuppressWarnings("rawtypes")
- private static void removeMergeBeforeFlatten(StreamGraph streamGraph) {
- Set<Tuple2<Integer, StreamOperator<?>>> operators = streamGraph.getOperators();
- List<Integer> flatteners = new ArrayList<Integer>();
-
- for (Tuple2<Integer, StreamOperator<?>> entry : operators) {
- if (entry.f1 instanceof WindowFlattener) {
- flatteners.add(entry.f0);
- }
- }
-
- for (Integer flattenerId : flatteners) {
- // Flatteners should have exactly one input
- StreamNode input = streamGraph.getStreamNode(flattenerId).getInEdges().get(0)
- .getSourceVertex();
-
- // Check whether the flatten is applied after a merge
- if (input.getOperator() instanceof WindowMerger) {
-
- // Mergers should have exactly one input
- StreamNode mergeInput = input.getInEdges().get(0).getSourceVertex();
-
- // We connect the merge input to the flattener directly
- streamGraph.addEdge(mergeInput.getId(), flattenerId, 0);
-
- // If the merger is only connected to the flattener we delete it
- // completely, otherwise we only remove the edge
- if (input.getOutEdges().size() > 1) {
- streamGraph.removeEdge(streamGraph.getStreamEdge(input.getId(), flattenerId));
- } else {
- streamGraph.removeVertex(input);
- }
-
- streamGraph.setParallelism(flattenerId, mergeInput.getParallelism());
- }
- }
-
- }
-
- private static void setDiscretizerReuse(StreamGraph streamGraph) {
-
- Collection<StreamNode> nodes = streamGraph.getStreamNodes();
- List<StreamNode> discretizers = new ArrayList<StreamNode>();
-
- for (StreamNode node : nodes) {
- if (node.getOperator() instanceof StreamDiscretizer) {
- discretizers.add(node);
- }
- }
-
- List<Tuple2<StreamDiscretizer<?>, List<StreamNode>>> matchingDiscretizers = new ArrayList<Tuple2<StreamDiscretizer<?>, List<StreamNode>>>();
-
- for (StreamNode discretizer : discretizers) {
- boolean matchedAny = false;
- for (Tuple2<StreamDiscretizer<?>, List<StreamNode>> candidate : matchingDiscretizers) {
-
- Set<Integer> discretizerInEdges = new HashSet<Integer>(
- discretizer.getInEdgeIndices());
- Set<Integer> toMatchInEdges = new HashSet<Integer>(candidate.f1.get(0)
- .getInEdgeIndices());
-
- boolean partitionersMatch = true;
-
- for (StreamEdge edge1 : discretizer.getInEdges()) {
- for (StreamEdge edge2 : candidate.f1.get(0).getInEdges()) {
- if (edge1.getPartitioner().getClass() != edge2.getPartitioner().getClass()) {
- partitionersMatch = false;
- }
- }
- }
-
- if (partitionersMatch
- && discretizer.getParallelism() == candidate.f1.get(0).getParallelism()
- && discretizer.getOperator().equals(candidate.f0)
- && discretizerInEdges.equals(toMatchInEdges)) {
-
- candidate.f1.add(discretizer);
- matchedAny = true;
- break;
- }
- }
- if (!matchedAny) {
- List<StreamNode> matchingNodes = new ArrayList<StreamNode>();
- matchingNodes.add(discretizer);
- matchingDiscretizers.add(new Tuple2<StreamDiscretizer<?>, List<StreamNode>>(
- (StreamDiscretizer<?>) discretizer.getOperator(), matchingNodes));
- }
- }
-
- for (Tuple2<StreamDiscretizer<?>, List<StreamNode>> matching : matchingDiscretizers) {
- List<StreamNode> matchList = matching.f1;
- if (matchList.size() > 1) {
- StreamNode first = matchList.get(0);
- for (int i = 1; i < matchList.size(); i++) {
- replaceDiscretizer(streamGraph, matchList.get(i).getId(), first.getId());
- }
- }
- }
- }
-
- private static void replaceDiscretizer(StreamGraph streamGraph, Integer toReplaceID,
- Integer replaceWithId) {
- // Convert to array to create a copy
- List<StreamEdge> outEdges = new ArrayList<StreamEdge>(streamGraph
- .getStreamNode(toReplaceID).getOutEdges());
-
- int numOutputs = outEdges.size();
-
- // Reconnect outputs
- for (int i = 0; i < numOutputs; i++) {
- StreamEdge outEdge = outEdges.get(i);
-
- streamGraph.addEdge(replaceWithId, outEdge.getTargetId(), 0);
- }
-
- // Remove the other discretizer
- streamGraph.removeVertex(streamGraph.getStreamNode(toReplaceID));
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/EmptyWindowFilter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/EmptyWindowFilter.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/EmptyWindowFilter.java
deleted file mode 100644
index e0fbd89..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/EmptyWindowFilter.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.windowing;
-
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-
-public class EmptyWindowFilter<OUT> implements FilterFunction<StreamWindow<OUT>> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public boolean filter(StreamWindow<OUT> value) throws Exception {
- return !value.isEmpty();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java
deleted file mode 100644
index 5141598..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.windowing;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.windowing.policy.CentralActiveTrigger;
-import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class GroupedActiveDiscretizer<IN> extends GroupedStreamDiscretizer<IN> {
-
- private static final long serialVersionUID = 1L;
-
- private static final Logger LOG = LoggerFactory.getLogger(GroupedActiveDiscretizer.class);
-
- private volatile IN last;
- private Thread centralThread;
- private CentralCheck centralCheck;
-
- public GroupedActiveDiscretizer(KeySelector<IN, ?> keySelector,
- CentralActiveTrigger<IN> triggerPolicy, CloneableEvictionPolicy<IN> evictionPolicy) {
- super(keySelector, triggerPolicy, evictionPolicy);
- }
-
- @Override
- protected StreamDiscretizer<IN> makeNewGroup(Object key) throws Exception {
-
- StreamDiscretizer<IN> groupDiscretizer = new StreamDiscretizer<IN>(triggerPolicy.clone(),
- evictionPolicy.clone());
-
- groupDiscretizer.setup(this.output, this.runtimeContext);
- // We omit the groupDiscretizer.open(...) call here to avoid starting
- // new active threads
- return groupDiscretizer;
- }
-
- @Override
- public void processElement(StreamRecord<IN> element) throws Exception {
-
-// last = copy(element);
- last = element.getValue();
- Object key = keySelector.getKey(element.getValue());
-
- synchronized (groupedDiscretizers) {
- StreamDiscretizer<IN> groupDiscretizer = groupedDiscretizers.get(key);
-
- if (groupDiscretizer == null) {
- groupDiscretizer = makeNewGroup(key);
- groupedDiscretizers.put(key, groupDiscretizer);
- }
-
- groupDiscretizer.processRealElement(element);
- }
- }
-
- @Override
- public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
- super.open(parameters);
- centralCheck = new CentralCheck();
- centralThread = new Thread(centralCheck);
- centralThread.start();
- }
-
- @Override
- public void dispose() {
- try {
- centralCheck.running = false;
- centralThread.interrupt();
- centralThread.join();
- } catch (InterruptedException e) {
- LOG.info("GroupedActiveDiscretizer got interruped while joining with central thread: {}", e);
- }
- }
-
- private class CentralCheck implements Runnable {
-
- volatile boolean running = true;
-
- @Override
- public void run() {
- while (running) {
- // wait for the specified granularity
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- // ignore it...
- }
-
- try {
- if (last != null) {
- synchronized (groupedDiscretizers) {
- for (StreamDiscretizer<IN> group : groupedDiscretizers.values()) {
-
- CentralActiveTrigger<IN> groupTrigger = (CentralActiveTrigger<IN>) group.triggerPolicy;
- Object[] fakes = groupTrigger.notifyOnLastGlobalElement(last);
- if (fakes != null) {
- for (Object fake : fakes) {
- group.triggerOnFakeElement(fake);
- }
- }
- }
- }
-
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
-
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedStreamDiscretizer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedStreamDiscretizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedStreamDiscretizer.java
deleted file mode 100644
index e3cab5c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedStreamDiscretizer.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.windowing;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy;
-import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-/**
- * This operator represents the grouped discretization step of a window
- * transformation. The user supplied eviction and trigger policies are applied
- * on a per group basis to create the {@link StreamWindow} that will be further
- * transformed in the next stages. </p> To allow pre-aggregations supply an
- * appropriate {@link WindowBuffer}.
- */
-public class GroupedStreamDiscretizer<IN> extends StreamDiscretizer<IN> {
-
- private static final long serialVersionUID = 1L;
-
- protected KeySelector<IN, ?> keySelector;
- protected Configuration parameters;
- protected CloneableTriggerPolicy<IN> triggerPolicy;
- protected CloneableEvictionPolicy<IN> evictionPolicy;
-
- protected Map<Object, StreamDiscretizer<IN>> groupedDiscretizers;
-
- public GroupedStreamDiscretizer(KeySelector<IN, ?> keySelector,
- CloneableTriggerPolicy<IN> triggerPolicy, CloneableEvictionPolicy<IN> evictionPolicy) {
-
- super(triggerPolicy, evictionPolicy);
-
- this.keySelector = keySelector;
-
- this.triggerPolicy = triggerPolicy;
- this.evictionPolicy = evictionPolicy;
-
- this.groupedDiscretizers = new HashMap<Object, StreamDiscretizer<IN>>();
- }
-
- @Override
- public void close() throws Exception {
- super.close();
- for (StreamDiscretizer<IN> group : groupedDiscretizers.values()) {
- group.emitWindow();
- }
- }
-
- @Override
- public void processElement(StreamRecord<IN> element) throws Exception {
-
-
- Object key = keySelector.getKey(element.getValue());
-
- StreamDiscretizer<IN> groupDiscretizer = groupedDiscretizers.get(key);
-
- if (groupDiscretizer == null) {
- groupDiscretizer = makeNewGroup(key);
- groupedDiscretizers.put(key, groupDiscretizer);
- }
-
- groupDiscretizer.processRealElement(element);
-
- }
-
- /**
- * This method creates a new group. The method gets called in case an
- * element arrives which has a key which was not seen before. The method
- * created a nested {@link StreamDiscretizer} and therefore created clones
- * of all distributed trigger and eviction policies.
- *
- * @param key
- * The key of the new group.
- */
- protected StreamDiscretizer<IN> makeNewGroup(Object key) throws Exception {
-
- StreamDiscretizer<IN> groupDiscretizer = new StreamDiscretizer<IN>(triggerPolicy.clone(),
- evictionPolicy.clone());
-
- // TODO: this seems very hacky, maybe we can get around this
- groupDiscretizer.setup(this.output, this.runtimeContext);
- groupDiscretizer.open(this.parameters);
-
- return groupDiscretizer;
- }
-
- @Override
- public boolean equals(Object other) {
- if (other == null || !(other instanceof GroupedStreamDiscretizer)) {
- return false;
- } else {
- try {
- @SuppressWarnings("unchecked")
- GroupedStreamDiscretizer<IN> otherDiscretizer = (GroupedStreamDiscretizer<IN>) other;
-
- return triggerPolicy.equals(otherDiscretizer.triggerPolicy)
- && evictionPolicy.equals(otherDiscretizer.evictionPolicy)
- && keySelector.equals(otherDiscretizer.keySelector);
-
- } catch (ClassCastException e) {
- return false;
- }
- }
- }
-
- @Override
- public String toString() {
- return "GroupedDiscretizer(Key: " + keySelector.getClass().getSimpleName() + ", Trigger: "
- + triggerPolicy.toString() + ", Eviction: " + evictionPolicy.toString() + ")";
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedWindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedWindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedWindowBuffer.java
deleted file mode 100644
index c74b96e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedWindowBuffer.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.windowing;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.WindowEvent;
-import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-/**
- * This operator flattens the results of the window transformations by
- * outputing the elements of the {@link StreamWindow} one-by-one
- */
-public class GroupedWindowBuffer<T> extends StreamWindowBuffer<T> {
-
- private static final long serialVersionUID = 1L;
-
- private KeySelector<T, ?> keySelector;
-
- private transient Map<Object, WindowBuffer<T>> windowMap;
-
- public GroupedWindowBuffer(WindowBuffer<T> buffer, KeySelector<T, ?> keySelector) {
- super(buffer);
- this.keySelector = keySelector;
- this.windowMap = new HashMap<Object, WindowBuffer<T>>();
- }
-
- private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
- in.defaultReadObject();
- this.windowMap = new HashMap<Object, WindowBuffer<T>>();
- }
-
- @Override
- public void processElement(StreamRecord<WindowEvent<T>> event) throws Exception {
- if (event.getValue().getElement() != null) {
- Object key = keySelector.getKey(event.getValue().getElement());
- WindowBuffer<T> currentWindow = windowMap.get(key);
-
- if (currentWindow == null) {
- currentWindow = buffer.clone();
- windowMap.put(key, currentWindow);
- }
-
- handleWindowEvent(event.getValue(), currentWindow);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/ParallelGroupedMerge.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/ParallelGroupedMerge.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/ParallelGroupedMerge.java
deleted file mode 100644
index de947eb..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/ParallelGroupedMerge.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.windowing;
-
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-
-/**
- * The version of the ParallelMerge CoFlatMap that does not reduce the incoming
- * elements only appends them to the current window. This is necessary for
- * grouped reduces.
- */
-public class ParallelGroupedMerge<OUT> extends ParallelMerge<OUT> {
-
- private static final long serialVersionUID = 1L;
-
- public ParallelGroupedMerge() {
- super(null);
- }
-
- @Override
- protected void updateCurrent(StreamWindow<OUT> current, StreamWindow<OUT> nextWindow)
- throws Exception {
- current.addAll(nextWindow);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/ParallelMerge.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/ParallelMerge.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/ParallelMerge.java
deleted file mode 100644
index ce7d887..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/ParallelMerge.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.windowing;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.util.Collector;
-
-/**
- * Class that encapsulates the functionality necessary to merge windows created
- * in parallel. This CoFlatMap uses the information received on the number of
- * parts for each window to merge the different parts. It waits until it
- * receives an indication on the number of parts from all the discretizers
- * before producing any output.
- */
-public class ParallelMerge<OUT> extends
- RichCoFlatMapFunction<StreamWindow<OUT>, Tuple2<Integer, Integer>, StreamWindow<OUT>> {
-
- private static final long serialVersionUID = 1L;
-
- protected Integer numberOfDiscretizers;
- private ReduceFunction<OUT> reducer;
-
- private Map<Integer, Integer> availableNumberOfParts = new HashMap<Integer, Integer>();
- private Map<Integer, Tuple2<StreamWindow<OUT>, Integer>> receivedWindows = new HashMap<Integer, Tuple2<StreamWindow<OUT>, Integer>>();
- private Map<Integer, Tuple2<Integer, Integer>> receivedNumberOfParts = new HashMap<Integer, Tuple2<Integer, Integer>>();
-
- public ParallelMerge(ReduceFunction<OUT> reducer) {
- this.reducer = reducer;
- }
-
- @Override
- public void flatMap1(StreamWindow<OUT> nextWindow, Collector<StreamWindow<OUT>> out)
- throws Exception {
-
- Integer id = nextWindow.windowID;
-
- Tuple2<StreamWindow<OUT>, Integer> current = receivedWindows.get(id);
-
- if (current == null) {
- current = new Tuple2<StreamWindow<OUT>, Integer>(nextWindow, 1);
- } else {
- updateCurrent(current.f0, nextWindow);
- current.f1++;
- }
-
- Integer count = current.f1;
-
- if (availableNumberOfParts.containsKey(id) && availableNumberOfParts.get(id) <= count) {
- out.collect(current.f0);
- receivedWindows.remove(id);
- availableNumberOfParts.remove(id);
-
- checkOld(id);
-
- } else {
- receivedWindows.put(id, (Tuple2<StreamWindow<OUT>, Integer>) current);
- }
- }
-
- private void checkOld(Integer id) {
- // In case we have remaining partial windows (which indicates errors in
- // processing), output and log them
- if (receivedWindows.containsKey(id - 1)) {
- throw new RuntimeException("Error in processing logic, window with id " + id
- + " should have already been processed");
- }
-
- }
-
- @Override
- public void flatMap2(Tuple2<Integer, Integer> partInfo, Collector<StreamWindow<OUT>> out)
- throws Exception {
-
- Integer id = partInfo.f0;
- Integer numOfParts = partInfo.f1;
-
- Tuple2<Integer, Integer> currentPartInfo = receivedNumberOfParts.get(id);
- if (currentPartInfo != null) {
- currentPartInfo.f0 += numOfParts;
- currentPartInfo.f1++;
- } else {
- currentPartInfo = new Tuple2<Integer, Integer>(numOfParts, 1);
- receivedNumberOfParts.put(id, currentPartInfo);
- }
-
- if (currentPartInfo.f1 >= numberOfDiscretizers) {
- receivedNumberOfParts.remove(id);
-
- Tuple2<StreamWindow<OUT>, Integer> current = receivedWindows.get(id);
-
- Integer count = current != null ? current.f1 : -1;
-
- if (count >= currentPartInfo.f0) {
- out.collect(current.f0);
- receivedWindows.remove(id);
- checkOld(id);
- } else if (currentPartInfo.f0 > 0) {
- availableNumberOfParts.put(id, currentPartInfo.f1);
- }
- }
-
- }
-
- protected void updateCurrent(StreamWindow<OUT> current, StreamWindow<OUT> nextWindow)
- throws Exception {
- if (current.size() != 1 || nextWindow.size() != 1) {
- throw new RuntimeException(
- "Error in parallel merge logic. Current window should contain only one element.");
- }
- OUT currentReduced = current.remove(0);
- currentReduced = reducer.reduce(currentReduced, nextWindow.get(0));
- current.add(currentReduced);
- }
-
- @Override
- public void open(Configuration conf) {
- this.numberOfDiscretizers = getRuntimeContext().getNumberOfParallelSubtasks();
- }
-
- Map<Integer, Tuple2<StreamWindow<OUT>, Integer>> getReceivedWindows() {
- return receivedWindows;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeOperator.java
deleted file mode 100644
index 74df3ad..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeOperator.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.windowing;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.operators.co.CoStreamFlatMap;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-
-public class ParallelMergeOperator<OUT> extends CoStreamFlatMap<StreamWindow<OUT>, Tuple2<Integer, Integer>, StreamWindow<OUT>> {
-
- private ParallelMerge<OUT> parallelMerge;
-
- public ParallelMergeOperator(ParallelMerge<OUT> parallelMerge) {
- super(parallelMerge);
- this.parallelMerge = parallelMerge;
- }
-
- @Override
- public void close() throws Exception {
- // emit remaining (partial) windows
-
- for (Tuple2<StreamWindow<OUT>, Integer> receivedWindow : parallelMerge.getReceivedWindows().values()) {
- getCollector().collect(receivedWindow.f0);
- }
-
- super.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamDiscretizer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamDiscretizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamDiscretizer.java
deleted file mode 100644
index a5e00aa..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamDiscretizer.java
+++ /dev/null
@@ -1,237 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.windowing;
-
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.WindowEvent;
-import org.apache.flink.streaming.api.windowing.policy.ActiveEvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerCallback;
-import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerPolicy;
-import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-/**
- * This operator represents the discretization step of a window transformation.
- * The user supplied eviction and trigger policies are applied to create the
- * {@link StreamWindow} that will be further transformed in the next stages.
- */
-public class StreamDiscretizer<IN>
- extends AbstractStreamOperator<WindowEvent<IN>>
- implements OneInputStreamOperator<IN, WindowEvent<IN>> {
-
- private static final long serialVersionUID = 1L;
-
- protected TriggerPolicy<IN> triggerPolicy;
- protected EvictionPolicy<IN> evictionPolicy;
- private boolean isActiveTrigger;
- private boolean isActiveEviction;
- private int bufferSize = 0;
-
- private transient Thread activePolicyThread;
-
- protected WindowEvent<IN> windowEvent = new WindowEvent<IN>();
-
- public StreamDiscretizer(TriggerPolicy<IN> triggerPolicy, EvictionPolicy<IN> evictionPolicy) {
- this.triggerPolicy = triggerPolicy;
- this.evictionPolicy = evictionPolicy;
-
- this.isActiveTrigger = triggerPolicy instanceof ActiveTriggerPolicy;
- this.isActiveEviction = evictionPolicy instanceof ActiveEvictionPolicy;
-
- this.chainingStrategy = ChainingStrategy.FORCE_ALWAYS;
- }
-
- public TriggerPolicy<IN> getTrigger() {
- return triggerPolicy;
- }
-
- public EvictionPolicy<IN> getEviction() {
- return evictionPolicy;
- }
-
- @Override
- public void processElement(StreamRecord<IN> element) throws Exception {
- processRealElement(element);
- }
-
- /**
- * This method processed an arrived real element The method is synchronized
- * to ensure that it cannot interleave with
- * {@link StreamDiscretizer#triggerOnFakeElement(Object)}
- *
- * @param input
- * a real input element
- * @throws Exception
- */
- protected synchronized void processRealElement(StreamRecord<IN> input) throws Exception {
-
- // Setting the input element in order to avoid NullFieldException when triggering on fake element
- windowEvent.setElement(input.getValue());
- if (isActiveTrigger) {
- ActiveTriggerPolicy<IN> trigger = (ActiveTriggerPolicy<IN>) triggerPolicy;
- Object[] result = trigger.preNotifyTrigger(input.getValue());
- for (Object in : result) {
- triggerOnFakeElement(in);
- }
- }
-
- boolean isTriggered = false;
-
- if (triggerPolicy.notifyTrigger(input.getValue())) {
- emitWindow();
- isTriggered = true;
- }
-
- evict(input.getValue(), isTriggered);
-
- output.collect(input.replace(windowEvent.setElement(input.getValue())));
- bufferSize++;
-
- }
-
- /**
- * This method triggers on an arrived fake element The method is
- * synchronized to ensure that it cannot interleave with
- * {@link StreamDiscretizer#processRealElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)}
- *
- * @param input
- * a fake input element
- */
- @SuppressWarnings("unchecked")
- protected synchronized void triggerOnFakeElement(Object input) {
- if (isActiveEviction) {
- activeEvict(input);
- emitWindow();
- } else {
- emitWindow();
- evict((IN) input, true);
- }
- }
-
- /**
- * This method emits the content of the buffer as a new {@link StreamWindow}
- * if not empty
- */
- protected void emitWindow() {
- output.collect(new StreamRecord<WindowEvent<IN>>(windowEvent.setTrigger()));
- }
-
- private void activeEvict(Object input) {
- int numToEvict = 0;
-
- if (isActiveEviction) {
- ActiveEvictionPolicy<IN> ep = (ActiveEvictionPolicy<IN>) evictionPolicy;
- numToEvict = ep.notifyEvictionWithFakeElement(input, bufferSize);
- }
-
- if (numToEvict > 0) {
- output.collect(new StreamRecord<WindowEvent<IN>>(windowEvent.setEviction(numToEvict)));
- bufferSize -= numToEvict;
- bufferSize = bufferSize >= 0 ? bufferSize : 0;
- }
- }
-
- private void evict(IN input, boolean isTriggered) {
- int numToEvict = evictionPolicy.notifyEviction(input, isTriggered, bufferSize);
-
- if (numToEvict > 0) {
- output.collect(new StreamRecord<WindowEvent<IN>>(windowEvent.setEviction(numToEvict)));
- bufferSize -= numToEvict;
- bufferSize = bufferSize >= 0 ? bufferSize : 0;
- }
- }
-
- @Override
- public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
- super.open(parameters);
-
- if (isActiveTrigger) {
- ActiveTriggerPolicy<IN> tp = (ActiveTriggerPolicy<IN>) triggerPolicy;
-
- Runnable runnable = tp.createActiveTriggerRunnable(new WindowingCallback());
- if (runnable != null) {
- activePolicyThread = new Thread(runnable);
- activePolicyThread.start();
- }
- }
- }
-
- @Override
- public void close() throws Exception {
- super.close();
- if (activePolicyThread != null) {
- activePolicyThread.interrupt();
- }
-
- emitWindow();
- }
-
- @Override
- public void dispose() {
- if (activePolicyThread != null) {
- activePolicyThread.interrupt();
- }
- }
-
- /**
- * This class allows the active trigger thread to call back and push fake
- * elements at any time.
- */
- private class WindowingCallback implements ActiveTriggerCallback {
-
- @Override
- public void sendFakeElement(Object datapoint) {
- triggerOnFakeElement(datapoint);
- }
-
- }
-
- @Override
- public boolean equals(Object other) {
- if (other == null || !(other instanceof StreamDiscretizer)
- || (other instanceof GroupedStreamDiscretizer)) {
- return false;
- } else {
- try {
- @SuppressWarnings("unchecked")
- StreamDiscretizer<IN> otherDiscretizer = (StreamDiscretizer<IN>) other;
-
- return triggerPolicy.equals(otherDiscretizer.triggerPolicy)
- && evictionPolicy.equals(otherDiscretizer.evictionPolicy);
-
- } catch (ClassCastException e) {
- return false;
- }
- }
- }
-
- @Override
- public String toString() {
- return "Discretizer(Trigger: " + triggerPolicy.toString() + ", Eviction: "
- + evictionPolicy.toString() + ")";
- }
-
- @Override
- public void processWatermark(Watermark mark) throws Exception {
- output.emitWatermark(mark);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamWindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamWindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamWindowBuffer.java
deleted file mode 100644
index c057f91..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamWindowBuffer.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.windowing;
-
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.WindowEvent;
-import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-/**
- * This operator manages the window buffers attached to the discretizers.
- */
-public class StreamWindowBuffer<T>
- extends AbstractStreamOperator<StreamWindow<T>>
- implements OneInputStreamOperator<WindowEvent<T>, StreamWindow<T>> {
-
- private static final long serialVersionUID = 1L;
-
- protected WindowBuffer<T> buffer;
-
- public StreamWindowBuffer(WindowBuffer<T> buffer) {
- this.buffer = buffer;
- setChainingStrategy(ChainingStrategy.FORCE_ALWAYS);
- disableInputCopy();
- }
-
- @Override
- public void processElement(StreamRecord<WindowEvent<T>> windowEvent) throws Exception {
- handleWindowEvent(windowEvent.getValue());
- }
-
- protected void handleWindowEvent(WindowEvent<T> windowEvent, WindowBuffer<T> buffer)
- throws Exception {
- if (windowEvent.isElement()) {
- buffer.store(windowEvent.getElement());
- } else if (windowEvent.isEviction()) {
- buffer.evict(windowEvent.getEviction());
- } else if (windowEvent.isTrigger()) {
- buffer.emitWindow(output);
- }
- }
-
- private void handleWindowEvent(WindowEvent<T> windowEvent) throws Exception {
- handleWindowEvent(windowEvent, buffer);
- }
-
- @Override
- public void processWatermark(Watermark mark) throws Exception {
- output.emitWatermark(mark);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFlattener.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFlattener.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFlattener.java
deleted file mode 100644
index fa7696a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFlattener.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.windowing;
-
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-/**
- * This operator flattens the results of the window transformations by
- * outputing the elements of the {@link StreamWindow} one-by-one
- */
-public class WindowFlattener<T> extends AbstractStreamOperator<T>
- implements OneInputStreamOperator<StreamWindow<T>, T> {
-
- private static final long serialVersionUID = 1L;
-
- public WindowFlattener() {
- chainingStrategy = ChainingStrategy.FORCE_ALWAYS;
- disableInputCopy();
- }
-
- @Override
- public void processElement(StreamRecord<StreamWindow<T>> window) throws Exception {
- for (T element : window.getValue()) {
- output.collect(new StreamRecord<T>(element));
- }
- }
-
- @Override
- public void processWatermark(Watermark mark) throws Exception {
- output.emitWatermark(mark);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFolder.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFolder.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFolder.java
deleted file mode 100644
index cdfc35b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFolder.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.windowing;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.functions.util.FunctionUtils;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.WindowedDataStream;
-import org.apache.flink.streaming.api.operators.StreamMap;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-
-/**
- * This operator is used to apply foldWindow transformations on
- * {@link WindowedDataStream}s.
- */
-public class WindowFolder<IN, OUT> extends StreamMap<StreamWindow<IN>, StreamWindow<OUT>> {
-
- private static final long serialVersionUID = 1L;
-
- FoldFunction<IN, OUT> folder;
-
- public WindowFolder(FoldFunction<IN, OUT> folder, OUT initialValue) {
- super(new WindowFoldFunction<IN, OUT>(folder, initialValue));
- this.folder = folder;
- disableInputCopy();
- }
-
- private static class WindowFoldFunction<IN, OUT> extends AbstractRichFunction implements
- MapFunction<StreamWindow<IN>, StreamWindow<OUT>> {
-
- private static final long serialVersionUID = 1L;
- private OUT initialValue;
- FoldFunction<IN, OUT> folder;
-
- public WindowFoldFunction(FoldFunction<IN, OUT> folder, OUT initialValue) {
- this.folder = folder;
- this.initialValue = initialValue;
- }
-
- @Override
- public StreamWindow<OUT> map(StreamWindow<IN> window) throws Exception {
- StreamWindow<OUT> outputWindow = new StreamWindow<OUT>(window.windowID);
- outputWindow.numberOfParts = window.numberOfParts;
-
- if (!window.isEmpty()) {
- OUT accumulator = initialValue;
- for (int i = 0; i < window.size(); i++) {
- accumulator = folder.fold(accumulator, window.get(i));
- }
- outputWindow.add(accumulator);
- }
- return outputWindow;
- }
-
- // --------------------------------------------------------------------------------------------
- // Forwarding calls to the wrapped folder
- // --------------------------------------------------------------------------------------------
-
- @Override
- public void open(Configuration parameters) throws Exception {
- FunctionUtils.openFunction(folder, parameters);
- }
-
- @Override
- public void close() throws Exception {
- FunctionUtils.closeFunction(folder);
- }
-
- @Override
- public void setRuntimeContext(RuntimeContext t) {
- FunctionUtils.setFunctionRuntimeContext(folder, t);
- }
-
- @Override
- public RuntimeContext getRuntimeContext() {
- return FunctionUtils.getFunctionRuntimeContext(folder, getRuntimeContext());
- }
-
- // streaming does not use iteration runtime context, so that is omitted
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMapper.java
deleted file mode 100644
index ec4309d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMapper.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.windowing;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.functions.util.FunctionUtils;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.WindowedDataStream;
-import org.apache.flink.streaming.api.functions.WindowMapFunction;
-import org.apache.flink.streaming.api.operators.StreamMap;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-
-/**
- * This operator is used to apply mapWindow transformations on
- * {@link WindowedDataStream}s.
- */
-public class WindowMapper<IN, OUT> extends StreamMap<StreamWindow<IN>, StreamWindow<OUT>> {
-
- private static final long serialVersionUID = 1L;
-
- WindowMapFunction<IN, OUT> mapper;
-
- public WindowMapper(WindowMapFunction<IN, OUT> mapper) {
- super(new WindowMap<IN, OUT>(mapper));
- this.mapper = mapper;
- disableInputCopy();
- }
-
- private static class WindowMap<T, R> extends AbstractRichFunction
- implements MapFunction<StreamWindow<T>, StreamWindow<R>> {
-
- private static final long serialVersionUID = 1L;
- WindowMapFunction<T, R> mapper;
-
- public WindowMap(WindowMapFunction<T, R> mapper) {
- this.mapper = mapper;
- }
-
- @Override
- public StreamWindow<R> map(StreamWindow<T> window) throws Exception {
- StreamWindow<R> outputWindow = new StreamWindow<R>(window.windowID);
-
- outputWindow.numberOfParts = window.numberOfParts;
-
- mapper.mapWindow(window, outputWindow);
-
- return outputWindow;
- }
-
- // --------------------------------------------------------------------------------------------
- // Forwarding calls to the wrapped mapper
- // --------------------------------------------------------------------------------------------
-
- @Override
- public void open(Configuration parameters) throws Exception {
- FunctionUtils.openFunction(mapper, parameters);
- }
-
- @Override
- public void close() throws Exception {
- FunctionUtils.closeFunction(mapper);
- }
-
- @Override
- public void setRuntimeContext(RuntimeContext t) {
- FunctionUtils.setFunctionRuntimeContext(mapper, t);
- }
-
- @Override
- public RuntimeContext getRuntimeContext() {
- return FunctionUtils.getFunctionRuntimeContext(mapper, getRuntimeContext());
- }
-
- // streaming does not use iteration runtime context, so that is omitted
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java
deleted file mode 100644
index 9ed5e82..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.windowing;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-/**
- * This operator merges together the different partitions of the
- * {@link StreamWindow}s used to merge the results of parallel transformations
- * that belong in the same window.
- */
-public class WindowMerger<T> extends AbstractStreamOperator<StreamWindow<T>>
- implements OneInputStreamOperator<StreamWindow<T>, StreamWindow<T>> {
-
- private static final long serialVersionUID = 1L;
-
- private Map<Integer, StreamWindow<T>> windows;
-
- public WindowMerger() {
- this.windows = new HashMap<Integer, StreamWindow<T>>();
- chainingStrategy = ChainingStrategy.FORCE_ALWAYS;
- disableInputCopy();
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public void processElement(StreamRecord<StreamWindow<T>> nextWindowRecord) throws Exception {
- StreamWindow<T> nextWindow = nextWindowRecord.getValue();
-
- StreamWindow<T> current = windows.get(nextWindow.windowID);
-
- if (current == null) {
- current = nextWindow;
- } else {
- current = StreamWindow.merge(current, nextWindow);
- }
-
- if (current.numberOfParts == 1) {
- nextWindowRecord.replace(current);
- output.collect(nextWindowRecord);
- windows.remove(nextWindow.windowID);
- } else {
- windows.put(nextWindow.windowID, current);
- }
- }
-
- @Override
- public void processWatermark(Watermark mark) throws Exception {
- output.emitWatermark(mark);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowPartExtractor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowPartExtractor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowPartExtractor.java
deleted file mode 100644
index 50b4e7d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowPartExtractor.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.windowing;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.util.Collector;
-
-/**
- * This FlatMapFunction is used to send the number of parts for each window ID
- * (for each parallel discretizer) to the parallel merger that will use is to
- * merge parallel discretized windows
- */
-public class WindowPartExtractor<OUT> implements FlatMapFunction<StreamWindow<OUT>, Tuple2<Integer, Integer>> {
-
- private static final long serialVersionUID = 1L;
-
- Integer lastIndex = -1;
-
- @Override
- public void flatMap(StreamWindow<OUT> value, Collector<Tuple2<Integer, Integer>> out)
- throws Exception {
-
- // We dont emit new values for the same index, this avoids sending the
- // same information for the same partitioned window multiple times
- if (value.windowID != lastIndex) {
-
- // For empty windows we send 0 since these windows will be filtered
- // out
- if (value.isEmpty()) {
- out.collect(new Tuple2<Integer, Integer>(value.windowID, 0));
- } else {
- out.collect(new Tuple2<Integer, Integer>(value.windowID, value.numberOfParts));
- }
- lastIndex = value.windowID;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowPartitioner.java
deleted file mode 100644
index 9f31fa0..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowPartitioner.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.windowing;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-/**
- * This operator applies either split or key partitioning depending on the
- * transformation.
- */
-public class WindowPartitioner<T> extends AbstractStreamOperator<StreamWindow<T>>
- implements OneInputStreamOperator<StreamWindow<T>, StreamWindow<T>> {
-
- private static final long serialVersionUID = 1L;
-
- private KeySelector<T, ?> keySelector;
- private int numberOfSplits;
-
- public WindowPartitioner(KeySelector<T, ?> keySelector) {
- this.keySelector = keySelector;
-
- chainingStrategy = ChainingStrategy.FORCE_ALWAYS;
- disableInputCopy();
- }
-
- public WindowPartitioner(int numberOfSplits) {
- this.numberOfSplits = numberOfSplits;
-
- chainingStrategy = ChainingStrategy.ALWAYS;
- }
-
- @Override
- public void processElement(StreamRecord<StreamWindow<T>> currentWindow) throws Exception {
-
- if (keySelector == null) {
- if (numberOfSplits <= 1) {
- output.collect(currentWindow);
- } else {
- StreamWindow<T> unpackedWindow = currentWindow.getValue();
- for (StreamWindow<T> window : StreamWindow.split(unpackedWindow, numberOfSplits)) {
- currentWindow.replace(window);
- output.collect(currentWindow);
- }
- }
- } else {
-
- for (StreamWindow<T> window : StreamWindow
- .partitionBy(currentWindow.getValue(), keySelector, true)) {
- output.collect(new StreamRecord<StreamWindow<T>>(window));
- }
-
- }
- }
-
- @Override
- public void processWatermark(Watermark mark) throws Exception {
- output.emitWatermark(mark);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowReducer.java
deleted file mode 100644
index a43405e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowReducer.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.windowing;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.functions.util.FunctionUtils;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.WindowedDataStream;
-import org.apache.flink.streaming.api.operators.StreamMap;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-
-/**
- * This operator is used to apply reduceWindow transformations on
- * {@link WindowedDataStream}s.
- */
-public class WindowReducer<IN> extends StreamMap<StreamWindow<IN>, StreamWindow<IN>> {
-
- private static final long serialVersionUID = 1L;
-
- ReduceFunction<IN> reducer;
-
- public WindowReducer(ReduceFunction<IN> reducer) {
- super(new WindowReduceFunction<IN>(reducer));
- this.reducer = reducer;
- disableInputCopy();
- }
-
- private static class WindowReduceFunction<T> extends AbstractRichFunction implements
- MapFunction<StreamWindow<T>, StreamWindow<T>> {
-
- private static final long serialVersionUID = 1L;
- ReduceFunction<T> reducer;
-
- public WindowReduceFunction(ReduceFunction<T> reducer) {
- this.reducer = reducer;
- }
-
- @Override
- public StreamWindow<T> map(StreamWindow<T> window) throws Exception {
- StreamWindow<T> outputWindow = new StreamWindow<T>(window.windowID);
- outputWindow.numberOfParts = window.numberOfParts;
-
- if (!window.isEmpty()) {
- T reduced = window.get(0);
- for (int i = 1; i < window.size(); i++) {
- reduced = reducer.reduce(reduced, window.get(i));
- }
- outputWindow.add(reduced);
- }
- return outputWindow;
- }
-
- // --------------------------------------------------------------------------------------------
- // Forwarding calls to the wrapped reducer
- // --------------------------------------------------------------------------------------------
-
-
- @Override
- public void open(Configuration parameters) throws Exception {
- FunctionUtils.openFunction(reducer, parameters);
- }
-
- @Override
- public void close() throws Exception {
- FunctionUtils.closeFunction(reducer);
- }
-
- @Override
- public void setRuntimeContext(RuntimeContext t) {
- FunctionUtils.setFunctionRuntimeContext(reducer, t);
- }
-
- @Override
- public RuntimeContext getRuntimeContext() {
- return FunctionUtils.getFunctionRuntimeContext(reducer, getRuntimeContext());
- }
-
- // streaming does not use iteration runtime context, so that is omitted
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindow.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindow.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindow.java
deleted file mode 100644
index 5a63940..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindow.java
+++ /dev/null
@@ -1,276 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.datastream.WindowedDataStream;
-import org.apache.flink.util.Collector;
-
-/**
- * Core abstraction for representing windows for {@link WindowedDataStream}s.
- * The user can apply transformations on these windows with the appropriate
- * {@link WindowedDataStream} methods. </p> Each stream window consists of a
- * random ID, a number representing the number of partitions for this specific
- * window (ID) and the elements itself. The ID and number of parts will be used
- * to merge the subwindows after distributed transformations.
- */
-public class StreamWindow<T> extends ArrayList<T> implements Collector<T> {
-
- private static final long serialVersionUID = -5150196421193988403L;
- private static Random rnd = new Random();
-
- public int windowID;
- public int numberOfParts;
-
- /**
- * Creates a new window with a random id
- */
- public StreamWindow() {
- this(rnd.nextInt(), 1);
- }
-
- /**
- * Creates a new window with the specific id
- *
- * @param windowID
- * ID of the window
- */
- public StreamWindow(int windowID) {
- this(windowID, 1);
- }
-
- /**
- * Creates a new window with the given id and number of parts
- *
- * @param windowID
- * @param numberOfParts
- */
- public StreamWindow(int windowID, int numberOfParts) {
- super();
- this.windowID = windowID;
- this.numberOfParts = numberOfParts;
- }
-
- /**
- * Creates a shallow copy of the window
- *
- * @param window
- * The window to be copied
- */
- public StreamWindow(StreamWindow<T> window) {
- this(window.windowID, window.numberOfParts);
- addAll(window);
- }
-
- /**
- * Creates a deep copy of the window using the given serializer
- *
- * @param window
- * The window to be copied
- * @param serializer
- * The serializer used for copying the records.
- */
- public StreamWindow(StreamWindow<T> window, TypeSerializer<T> serializer) {
- this(window.windowID, window.numberOfParts);
- for (T element : window) {
- add(serializer.copy(element));
- }
- }
-
- /**
- * Partitions the window using the given keyselector. A subwindow will be
- * created for each key.
- *
- * @param streamWindow
- * StreamWindow instance to partition
- * @param keySelector
- * The keyselector used for extracting keys.
- * @param withKey
- * Flag to decide whether the key object should be included in
- * the created window
- * @return A list of the subwindows
- */
- public static <X> List<StreamWindow<X>> partitionBy(StreamWindow<X> streamWindow,
- KeySelector<X, ?> keySelector, boolean withKey) throws Exception {
- Map<Object, StreamWindow<X>> partitions = new HashMap<Object, StreamWindow<X>>();
-
- for (X value : streamWindow) {
- Object key = keySelector.getKey(value);
- StreamWindow<X> window = partitions.get(key);
- if (window == null) {
- window = new StreamWindow<X>(streamWindow.windowID, 0);
- partitions.put(key, window);
- }
- window.add(value);
- }
-
- List<StreamWindow<X>> output = new ArrayList<StreamWindow<X>>();
- int numkeys = partitions.size();
-
- for (StreamWindow<X> window : partitions.values()) {
- output.add(window.setNumberOfParts(numkeys));
- }
-
- return output;
- }
-
- /**
- * Splits the window into n equal (if possible) sizes.
- *
- * @param window
- * Window to split
- * @param n
- * Number of desired partitions
- * @return The list of subwindows.
- */
- public static <X> List<StreamWindow<X>> split(StreamWindow<X> window, int n) {
- int numElements = window.size();
- if (n == 0) {
- return new ArrayList<StreamWindow<X>>();
- }
- if (n > numElements) {
- return split(window, numElements);
- } else {
- List<StreamWindow<X>> splitsList = new ArrayList<StreamWindow<X>>();
- int splitSize = numElements / n;
-
- int index = -1;
-
- StreamWindow<X> currentSubWindow = new StreamWindow<X>(window.windowID, n);
- splitsList.add(currentSubWindow);
-
- for (X element : window) {
- index++;
- if (index == splitSize && splitsList.size() < n) {
- currentSubWindow = new StreamWindow<X>(window.windowID, n);
- splitsList.add(currentSubWindow);
- index = 0;
- }
- currentSubWindow.add(element);
- }
- return splitsList;
- }
- }
-
- public StreamWindow<T> setNumberOfParts(int n) {
- this.numberOfParts = n;
- return this;
- }
-
- public void setID(int id) {
- this.windowID = id;
- }
-
- /**
- * Checks whether this window can be merged with the given one.
- *
- * @param otherWindow
- * The window to test
- * @return Window compatibility
- */
- public boolean compatibleWith(StreamWindow<T> otherWindow) {
- return this.windowID == otherWindow.windowID && this.numberOfParts > 1;
- }
-
- /**
- * Merges compatible windows together.
- *
- * @param windows
- * Windows to merge
- * @return Merged window
- */
- public static <R> StreamWindow<R> merge(StreamWindow<R>... windows) {
- StreamWindow<R> window = new StreamWindow<R>(windows[0]);
- for (int i = 1; i < windows.length; i++) {
- StreamWindow<R> next = windows[i];
- if (window.compatibleWith(next)) {
- window.addAll(next);
- window.numberOfParts--;
- } else {
- throw new RuntimeException("Can only merge compatible windows");
- }
- }
- return window;
- }
-
- /**
- * Merges compatible windows together.
- *
- * @param windows
- * Windows to merge
- * @return Merged window
- */
- public static <R> StreamWindow<R> merge(List<StreamWindow<R>> windows) {
- if (windows.isEmpty()) {
- throw new RuntimeException("Need at least one window to merge");
- } else {
- StreamWindow<R> window = new StreamWindow<R>(windows.get(0));
- for (int i = 1; i < windows.size(); i++) {
- StreamWindow<R> next = windows.get(i);
- if (window.compatibleWith(next)) {
- window.addAll(next);
- window.numberOfParts--;
- } else {
- throw new RuntimeException("Can only merge compatible windows");
- }
- }
- return window;
- }
- }
-
- @Override
- public boolean equals(Object o) {
- return super.equals(o);
- }
-
- @Override
- public void collect(T record) {
- add(record);
- }
-
- @Override
- public void close() {
- }
-
- @Override
- public String toString() {
- return super.toString();
- }
-
- /**
- * Creates a new {@link StreamWindow} with random id from the given elements
- *
- * @param elements
- * The elements contained in the resulting window
- * @return The window
- */
- public static <R> StreamWindow<R> fromElements(R... elements) {
- StreamWindow<R> window = new StreamWindow<R>();
- for (R element : elements) {
- window.add(element);
- }
- return window;
- }
-}