You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:03:48 UTC
[32/51] [partial] flink git commit: [FLINK-2877] Move Streaming API
out of Staging package
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/StreamingOperatorMetrics.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/StreamingOperatorMetrics.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/StreamingOperatorMetrics.java
deleted file mode 100644
index 8429889..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/StreamingOperatorMetrics.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators;
-
-public class StreamingOperatorMetrics {
-
-
- public void incrementLateElementDiscarded() {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/Triggerable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/Triggerable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/Triggerable.java
deleted file mode 100644
index 50d1cb6..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/Triggerable.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators;
-
-/**
- * This interface must be implemented by objects that are triggered by the timer service available
- * to stream operators in {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment}.
- */
-public interface Triggerable {
-
- /**
- * This method is invoked with the timestamp for which the trigger was scheduled.
- * <p>
- * If the triggering is delayed for whatever reason (trigger timer was blocked, JVM stalled due
- * to a garbage collection), the timestamp supplied to this function will still be the original
- * timestamp for which the trigger was scheduled.
- *
- * @param timestamp The timestamp for which the trigger event was scheduled.
- */
- void trigger(long timestamp) throws Exception ;
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/package-info.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/package-info.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/package-info.java
deleted file mode 100644
index 5fe6873..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * This package contains the operators that perform the stream transformations.
- * One or more operators are bundled into a "chain" and executed in a stream task.
- */
-package org.apache.flink.streaming.runtime.operators;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
deleted file mode 100644
index 3165f88..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
+++ /dev/null
@@ -1,335 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import org.apache.commons.math3.util.ArithmeticUtils;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.util.MathUtils;
-import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.TimestampedCollector;
-import org.apache.flink.runtime.state.StateBackend;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
-
-import static java.util.Objects.requireNonNull;
-
-public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, STATE, F extends Function>
- extends AbstractUdfStreamOperator<OUT, F>
- implements OneInputStreamOperator<IN, OUT>, Triggerable {
-
- private static final long serialVersionUID = 3245500864882459867L;
-
- private static final long MIN_SLIDE_TIME = 50;
-
- // ----- fields for operator parametrization -----
-
- private final Function function;
- private final KeySelector<IN, KEY> keySelector;
-
- private final TypeSerializer<KEY> keySerializer;
- private final TypeSerializer<STATE> stateTypeSerializer;
-
- private final long windowSize;
- private final long windowSlide;
- private final long paneSize;
- private final int numPanesPerWindow;
-
- // ----- fields for operator functionality -----
-
- private transient AbstractKeyedTimePanes<IN, KEY, STATE, OUT> panes;
-
- private transient TimestampedCollector<OUT> out;
-
- private transient RestoredState<IN, KEY, STATE, OUT> restoredState;
-
- private transient long nextEvaluationTime;
- private transient long nextSlideTime;
-
- protected AbstractAlignedProcessingTimeWindowOperator(
- F function,
- KeySelector<IN, KEY> keySelector,
- TypeSerializer<KEY> keySerializer,
- TypeSerializer<STATE> stateTypeSerializer,
- long windowLength,
- long windowSlide)
- {
- super(function);
-
- if (windowLength < MIN_SLIDE_TIME) {
- throw new IllegalArgumentException("Window length must be at least " + MIN_SLIDE_TIME + " msecs");
- }
- if (windowSlide < MIN_SLIDE_TIME) {
- throw new IllegalArgumentException("Window slide must be at least " + MIN_SLIDE_TIME + " msecs");
- }
- if (windowLength < windowSlide) {
- throw new IllegalArgumentException("The window size must be larger than the window slide");
- }
-
- final long paneSlide = ArithmeticUtils.gcd(windowLength, windowSlide);
- if (paneSlide < MIN_SLIDE_TIME) {
- throw new IllegalArgumentException(String.format(
- "Cannot compute window of size %d msecs sliding by %d msecs. " +
- "The unit of grouping is too small: %d msecs", windowLength, windowSlide, paneSlide));
- }
-
- this.function = requireNonNull(function);
- this.keySelector = requireNonNull(keySelector);
- this.keySerializer = requireNonNull(keySerializer);
- this.stateTypeSerializer = requireNonNull(stateTypeSerializer);
- this.windowSize = windowLength;
- this.windowSlide = windowSlide;
- this.paneSize = paneSlide;
- this.numPanesPerWindow = MathUtils.checkedDownCast(windowLength / paneSlide);
- }
-
-
- protected abstract AbstractKeyedTimePanes<IN, KEY, STATE, OUT> createPanes(
- KeySelector<IN, KEY> keySelector, Function function);
-
- // ------------------------------------------------------------------------
- // startup and shutdown
- // ------------------------------------------------------------------------
-
- @Override
- public void open() throws Exception {
- super.open();
-
- out = new TimestampedCollector<>(output);
-
- // decide when to first compute the window and when to slide it
- // the values should align with the start of time (that is, the UNIX epoch, not the big bang)
- final long now = System.currentTimeMillis();
- nextEvaluationTime = now + windowSlide - (now % windowSlide);
- nextSlideTime = now + paneSize - (now % paneSize);
-
- final long firstTriggerTime = Math.min(nextEvaluationTime, nextSlideTime);
-
- // check if we restored state and if we need to fire some windows based on that restored state
- if (restoredState == null) {
- // initial empty state: create empty panes that gather the elements per slide
- panes = createPanes(keySelector, function);
- }
- else {
- // restored state
- panes = restoredState.panes;
-
- long nextPastEvaluationTime = restoredState.nextEvaluationTime;
- long nextPastSlideTime = restoredState.nextSlideTime;
- long nextPastTriggerTime = Math.min(nextPastEvaluationTime, nextPastSlideTime);
- int numPanesRestored = panes.getNumPanes();
-
- // fire windows from the past as long as there are more panes with data and as long
- // as the missed trigger times have not caught up with the presence
- while (numPanesRestored > 0 && nextPastTriggerTime < firstTriggerTime) {
- // evaluate the window from the past
- if (nextPastTriggerTime == nextPastEvaluationTime) {
- computeWindow(nextPastTriggerTime);
- nextPastEvaluationTime += windowSlide;
- }
-
- // evaluate slide from the past
- if (nextPastTriggerTime == nextPastSlideTime) {
- panes.slidePanes(numPanesPerWindow);
- numPanesRestored--;
- nextPastSlideTime += paneSize;
- }
-
- nextPastTriggerTime = Math.min(nextPastEvaluationTime, nextPastSlideTime);
- }
- }
-
- // make sure the first window happens
- registerTimer(firstTriggerTime, this);
- }
-
- @Override
- public void close() throws Exception {
- super.close();
-
- final long finalWindowTimestamp = nextEvaluationTime;
-
- // early stop the triggering thread, so it does not attempt to return any more data
- stopTriggers();
-
- // emit the remaining data
- computeWindow(finalWindowTimestamp);
- }
-
- @Override
- public void dispose() {
- super.dispose();
-
- // acquire the lock during shutdown, to prevent trigger calls at the same time
- // fail-safe stop of the triggering thread (in case of an error)
- stopTriggers();
-
- // release the panes. panes may still be null if dispose is called
- // after open() failed
- if (panes != null) {
- panes.dispose();
- }
- }
-
- private void stopTriggers() {
- // reset the action timestamps. this makes sure any pending triggers will not evaluate
- nextEvaluationTime = -1L;
- nextSlideTime = -1L;
- }
-
- // ------------------------------------------------------------------------
- // Receiving elements and triggers
- // ------------------------------------------------------------------------
-
- @Override
- public void processElement(StreamRecord<IN> element) throws Exception {
- panes.addElementToLatestPane(element.getValue());
- }
-
- @Override
- public void processWatermark(Watermark mark) {
- // this operator does not react to watermarks
- }
-
- @Override
- public void trigger(long timestamp) throws Exception {
- // first we check if we actually trigger the window function
- if (timestamp == nextEvaluationTime) {
- // compute and output the results
- computeWindow(timestamp);
-
- nextEvaluationTime += windowSlide;
- }
-
- // check if we slide the panes by one. this may happen in addition to the
- // window computation, or just by itself
- if (timestamp == nextSlideTime) {
- panes.slidePanes(numPanesPerWindow);
- nextSlideTime += paneSize;
- }
-
- long nextTriggerTime = Math.min(nextEvaluationTime, nextSlideTime);
- registerTimer(nextTriggerTime, this);
- }
-
- private void computeWindow(long timestamp) throws Exception {
- out.setTimestamp(timestamp);
- panes.truncatePanes(numPanesPerWindow);
- panes.evaluateWindow(out, new TimeWindow(timestamp, timestamp + windowSize));
- }
-
- // ------------------------------------------------------------------------
- // Checkpointing
- // ------------------------------------------------------------------------
-
- @Override
- public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
- StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
-
- // we write the panes with the key/value maps into the stream, as well as when this state
- // should have triggered and slided
- StateBackend.CheckpointStateOutputView out =
- getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp);
-
- out.writeLong(nextEvaluationTime);
- out.writeLong(nextSlideTime);
- panes.writeToOutput(out, keySerializer, stateTypeSerializer);
-
- taskState.setOperatorState(out.closeAndGetHandle());
- return taskState;
- }
-
- @Override
- public void restoreState(StreamTaskState taskState) throws Exception {
- super.restoreState(taskState);
-
- @SuppressWarnings("unchecked")
- StateHandle<DataInputView> inputState = (StateHandle<DataInputView>) taskState.getOperatorState();
- DataInputView in = inputState.getState(getUserCodeClassloader());
-
- final long nextEvaluationTime = in.readLong();
- final long nextSlideTime = in.readLong();
-
- AbstractKeyedTimePanes<IN, KEY, STATE, OUT> panes = createPanes(keySelector, function);
- panes.readFromInput(in, keySerializer, stateTypeSerializer);
-
- restoredState = new RestoredState<>(panes, nextEvaluationTime, nextSlideTime);
- }
-
- // ------------------------------------------------------------------------
- // Property access (for testing)
- // ------------------------------------------------------------------------
-
- public long getWindowSize() {
- return windowSize;
- }
-
- public long getWindowSlide() {
- return windowSlide;
- }
-
- public long getPaneSize() {
- return paneSize;
- }
-
- public int getNumPanesPerWindow() {
- return numPanesPerWindow;
- }
-
- public long getNextEvaluationTime() {
- return nextEvaluationTime;
- }
-
- public long getNextSlideTime() {
- return nextSlideTime;
- }
-
- // ------------------------------------------------------------------------
- // Utilities
- // ------------------------------------------------------------------------
-
- @Override
- public String toString() {
- return "Window (processing time) (length=" + windowSize + ", slide=" + windowSlide + ')';
- }
-
- // ------------------------------------------------------------------------
- // ------------------------------------------------------------------------
-
- private static final class RestoredState<IN, KEY, STATE, OUT> {
-
- final AbstractKeyedTimePanes<IN, KEY, STATE, OUT> panes;
- final long nextEvaluationTime;
- final long nextSlideTime;
-
- RestoredState(AbstractKeyedTimePanes<IN, KEY, STATE, OUT> panes, long nextEvaluationTime, long nextSlideTime) {
- this.panes = panes;
- this.nextEvaluationTime = nextEvaluationTime;
- this.nextSlideTime = nextSlideTime;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java
deleted file mode 100644
index d1cea20..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.util.Collector;
-
-import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.Iterator;
-
-/**
- * Base class for a multiple key/value maps organized in panes.
- */
-public abstract class AbstractKeyedTimePanes<Type, Key, Aggregate, Result> {
-
- private static final int BEGIN_OF_STATE_MAGIC_NUMBER = 0x0FF1CE42;
-
- private static final int BEGIN_OF_PANE_MAGIC_NUMBER = 0xBADF00D5;
-
- /** The latest time pane */
- protected KeyMap<Key, Aggregate> latestPane = new KeyMap<>();
-
- /** The previous time panes, ordered by time (early to late) */
- protected final ArrayDeque<KeyMap<Key, Aggregate>> previousPanes = new ArrayDeque<>();
-
- // ------------------------------------------------------------------------
-
- public abstract void addElementToLatestPane(Type element) throws Exception;
-
- public abstract void evaluateWindow(Collector<Result> out, TimeWindow window) throws Exception;
-
-
- public void dispose() {
- // since all is heap data, there is no need to clean up anything
- latestPane = null;
- previousPanes.clear();
- }
-
- public int getNumPanes() {
- return previousPanes.size() + 1;
- }
-
-
- public void slidePanes(int panesToKeep) {
- if (panesToKeep > 1) {
- // the current pane becomes the latest previous pane
- previousPanes.addLast(latestPane);
-
- // truncate the history
- while (previousPanes.size() >= panesToKeep) {
- previousPanes.removeFirst();
- }
- }
-
- // we need a new latest pane
- latestPane = new KeyMap<>();
- }
-
- public void truncatePanes(int numToRetain) {
- while (previousPanes.size() >= numToRetain) {
- previousPanes.removeFirst();
- }
- }
-
- protected void traverseAllPanes(KeyMap.TraversalEvaluator<Key, Aggregate> traversal, long traversalPass) throws Exception{
- // gather all panes in an array (faster iterations)
- @SuppressWarnings({"unchecked", "rawtypes"})
- KeyMap<Key, Aggregate>[] panes = previousPanes.toArray(new KeyMap[previousPanes.size() + 1]);
- panes[panes.length - 1] = latestPane;
-
- // let the maps make a coordinated traversal and evaluate the window function per contained key
- KeyMap.traverseMaps(panes, traversal, traversalPass);
- }
-
- // ------------------------------------------------------------------------
- // Serialization and de-serialization
- // ------------------------------------------------------------------------
-
- public void writeToOutput(
- final DataOutputView output,
- final TypeSerializer<Key> keySerializer,
- final TypeSerializer<Aggregate> aggSerializer) throws IOException
- {
- output.writeInt(BEGIN_OF_STATE_MAGIC_NUMBER);
-
- int numPanes = getNumPanes();
- output.writeInt(numPanes);
-
- // write from the past
- Iterator<KeyMap<Key, Aggregate>> previous = previousPanes.iterator();
- for (int paneNum = 0; paneNum < numPanes; paneNum++) {
- output.writeInt(BEGIN_OF_PANE_MAGIC_NUMBER);
- KeyMap<Key, Aggregate> pane = (paneNum == numPanes - 1) ? latestPane : previous.next();
-
- output.writeInt(pane.size());
- for (KeyMap.Entry<Key, Aggregate> entry : pane) {
- keySerializer.serialize(entry.getKey(), output);
- aggSerializer.serialize(entry.getValue(), output);
- }
- }
- }
-
- public void readFromInput(
- final DataInputView input,
- final TypeSerializer<Key> keySerializer,
- final TypeSerializer<Aggregate> aggSerializer) throws IOException
- {
- validateMagicNumber(BEGIN_OF_STATE_MAGIC_NUMBER, input.readInt());
- int numPanes = input.readInt();
-
- // read from the past towards the presence
- while (numPanes > 0) {
- validateMagicNumber(BEGIN_OF_PANE_MAGIC_NUMBER, input.readInt());
- KeyMap<Key, Aggregate> pane = (numPanes == 1) ? latestPane : new KeyMap<Key, Aggregate>();
-
- final int numElementsInPane = input.readInt();
- for (int i = numElementsInPane - 1; i >= 0; i--) {
- Key k = keySerializer.deserialize(input);
- Aggregate a = aggSerializer.deserialize(input);
- pane.put(k, a);
- }
-
- if (numPanes > 1) {
- previousPanes.addLast(pane);
- }
- numPanes--;
- }
- }
-
- private static void validateMagicNumber(int expected, int found) throws IOException {
- if (expected != found) {
- throw new IOException("Corrupt state stream - wrong magic number. " +
- "Expected '" + Integer.toHexString(expected) +
- "', found '" + Integer.toHexString(found) + '\'');
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
deleted file mode 100644
index c854e6c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.runtime.util.UnionIterator;
-import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.util.Collector;
-
-import java.util.ArrayList;
-
-
-public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyedTimePanes<Type, Key, ArrayList<Type>, Result> {
-
- private final KeySelector<Type, Key> keySelector;
-
- private final KeyMap.LazyFactory<ArrayList<Type>> listFactory = getListFactory();
-
- private final WindowFunction<Type, Result, Key, Window> function;
-
- /**
- * IMPORTANT: This value needs to start at one, so it is fresher than the value that new entries have (zero) */
- private long evaluationPass = 1L;
-
- // ------------------------------------------------------------------------
-
- public AccumulatingKeyedTimePanes(KeySelector<Type, Key> keySelector, WindowFunction<Type, Result, Key, Window> function) {
- this.keySelector = keySelector;
- this.function = function;
- }
-
- // ------------------------------------------------------------------------
-
- @Override
- public void addElementToLatestPane(Type element) throws Exception {
- Key k = keySelector.getKey(element);
- ArrayList<Type> elements = latestPane.putIfAbsent(k, listFactory);
- elements.add(element);
- }
-
- @Override
- public void evaluateWindow(Collector<Result> out, TimeWindow window) throws Exception {
- if (previousPanes.isEmpty()) {
- // optimized path for single pane case (tumbling window)
- for (KeyMap.Entry<Key, ArrayList<Type>> entry : latestPane) {
- function.apply(entry.getKey(), window, entry.getValue(), out);
- }
- }
- else {
- // general code path for multi-pane case
- WindowFunctionTraversal<Key, Type, Result> evaluator = new WindowFunctionTraversal<>(function, window, out);
- traverseAllPanes(evaluator, evaluationPass);
- }
-
- evaluationPass++;
- }
-
- // ------------------------------------------------------------------------
- // Running a window function in a map traversal
- // ------------------------------------------------------------------------
-
- static final class WindowFunctionTraversal<Key, Type, Result> implements KeyMap.TraversalEvaluator<Key, ArrayList<Type>> {
-
- private final WindowFunction<Type, Result, Key, Window> function;
-
- private final UnionIterator<Type> unionIterator;
-
- private final Collector<Result> out;
-
- private Key currentKey;
-
- private TimeWindow window;
-
- WindowFunctionTraversal(WindowFunction<Type, Result, Key, Window> function, TimeWindow window, Collector<Result> out) {
- this.function = function;
- this.out = out;
- this.unionIterator = new UnionIterator<>();
- this.window = window;
- }
-
-
- @Override
- public void startNewKey(Key key) {
- unionIterator.clear();
- currentKey = key;
- }
-
- @Override
- public void nextValue(ArrayList<Type> value) {
- unionIterator.addList(value);
- }
-
- @Override
- public void keyDone() throws Exception {
- function.apply(currentKey, window, unionIterator, out);
- }
- }
-
- // ------------------------------------------------------------------------
- // Lazy factory for lists (put if absent)
- // ------------------------------------------------------------------------
-
- @SuppressWarnings("unchecked")
- private static <V> KeyMap.LazyFactory<ArrayList<V>> getListFactory() {
- return (KeyMap.LazyFactory<ArrayList<V>>) LIST_FACTORY;
- }
-
- private static final KeyMap.LazyFactory<?> LIST_FACTORY = new KeyMap.LazyFactory<ArrayList<?>>() {
-
- @Override
- public ArrayList<?> create() {
- return new ArrayList<>(4);
- }
- };
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
deleted file mode 100644
index 7a7d04c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-
-import java.io.IOException;
-import java.util.ArrayList;
-
-
-public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT>
- extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, ArrayList<IN>, WindowFunction<IN, OUT, KEY, TimeWindow>> {
-
- private static final long serialVersionUID = 7305948082830843475L;
-
-
- public AccumulatingProcessingTimeWindowOperator(
- WindowFunction<IN, OUT, KEY, TimeWindow> function,
- KeySelector<IN, KEY> keySelector,
- TypeSerializer<KEY> keySerializer,
- TypeSerializer<IN> valueSerializer,
- long windowLength,
- long windowSlide)
- {
- super(function, keySelector, keySerializer,
- new ArrayListSerializer<IN>(valueSerializer), windowLength, windowSlide);
- }
-
- @Override
- protected AccumulatingKeyedTimePanes<IN, KEY, OUT> createPanes(KeySelector<IN, KEY> keySelector, Function function) {
- @SuppressWarnings("unchecked")
- WindowFunction<IN, OUT, KEY, Window> windowFunction = (WindowFunction<IN, OUT, KEY, Window>) function;
-
- return new AccumulatingKeyedTimePanes<>(keySelector, windowFunction);
- }
-
- // ------------------------------------------------------------------------
- // Utility Serializer for Lists of Elements
- // ------------------------------------------------------------------------
-
- @SuppressWarnings("ForLoopReplaceableByForEach")
- private static final class ArrayListSerializer<T> extends TypeSerializer<ArrayList<T>> {
-
- private static final long serialVersionUID = 1119562170939152304L;
-
- private final TypeSerializer<T> elementSerializer;
-
- ArrayListSerializer(TypeSerializer<T> elementSerializer) {
- this.elementSerializer = elementSerializer;
- }
-
- @Override
- public boolean isImmutableType() {
- return false;
- }
-
- @Override
- public TypeSerializer<ArrayList<T>> duplicate() {
- TypeSerializer<T> duplicateElement = elementSerializer.duplicate();
- return duplicateElement == elementSerializer ? this : new ArrayListSerializer<T>(duplicateElement);
- }
-
- @Override
- public ArrayList<T> createInstance() {
- return new ArrayList<>();
- }
-
- @Override
- public ArrayList<T> copy(ArrayList<T> from) {
- ArrayList<T> newList = new ArrayList<>(from.size());
- for (int i = 0; i < from.size(); i++) {
- newList.add(elementSerializer.copy(from.get(i)));
- }
- return newList;
- }
-
- @Override
- public ArrayList<T> copy(ArrayList<T> from, ArrayList<T> reuse) {
- return copy(from);
- }
-
- @Override
- public int getLength() {
- return -1; // var length
- }
-
- @Override
- public void serialize(ArrayList<T> list, DataOutputView target) throws IOException {
- final int size = list.size();
- target.writeInt(size);
- for (int i = 0; i < size; i++) {
- elementSerializer.serialize(list.get(i), target);
- }
- }
-
- @Override
- public ArrayList<T> deserialize(DataInputView source) throws IOException {
- final int size = source.readInt();
- final ArrayList<T> list = new ArrayList<>(size);
- for (int i = 0; i < size; i++) {
- list.add(elementSerializer.deserialize(source));
- }
- return list;
- }
-
- @Override
- public ArrayList<T> deserialize(ArrayList<T> reuse, DataInputView source) throws IOException {
- return deserialize(source);
- }
-
- @Override
- public void copy(DataInputView source, DataOutputView target) throws IOException {
- // copy number of elements
- final int num = source.readInt();
- target.writeInt(num);
- for (int i = 0; i < num; i++) {
- elementSerializer.copy(source, target);
- }
- }
-
- // --------------------------------------------------------------------
-
- @Override
- public boolean equals(Object obj) {
- return obj == this ||
- (obj != null && obj.getClass() == getClass() &&
- elementSerializer.equals(((ArrayListSerializer<?>) obj).elementSerializer));
- }
-
- @Override
- public boolean canEqual(Object obj) {
- return true;
- }
-
- @Override
- public int hashCode() {
- return elementSerializer.hashCode();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java
deleted file mode 100644
index d395b2a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.util.Collector;
-
-
-public class AggregatingKeyedTimePanes<Type, Key> extends AbstractKeyedTimePanes<Type, Key, Type, Type> {
-
- private final KeySelector<Type, Key> keySelector;
-
- private final ReduceFunction<Type> reducer;
-
- /**
- * IMPORTANT: This value needs to start at one, so it is fresher than the value that new entries have (zero) */
- private long evaluationPass = 1L;
-
- // ------------------------------------------------------------------------
-
- public AggregatingKeyedTimePanes(KeySelector<Type, Key> keySelector, ReduceFunction<Type> reducer) {
- this.keySelector = keySelector;
- this.reducer = reducer;
- }
-
- // ------------------------------------------------------------------------
-
- @Override
- public void addElementToLatestPane(Type element) throws Exception {
- Key k = keySelector.getKey(element);
- latestPane.putOrAggregate(k, element, reducer);
- }
-
- @Override
- public void evaluateWindow(Collector<Type> out, TimeWindow window) throws Exception {
- if (previousPanes.isEmpty()) {
- // optimized path for single pane case
- for (KeyMap.Entry<Key, Type> entry : latestPane) {
- out.collect(entry.getValue());
- }
- }
- else {
- // general code path for multi-pane case
- AggregatingTraversal<Key, Type> evaluator = new AggregatingTraversal<>(reducer, out);
- traverseAllPanes(evaluator, evaluationPass);
- }
-
- evaluationPass++;
- }
-
- // ------------------------------------------------------------------------
- // The maps traversal that performs the final aggregation
- // ------------------------------------------------------------------------
-
- static final class AggregatingTraversal<Key, Type> implements KeyMap.TraversalEvaluator<Key, Type> {
-
- private final ReduceFunction<Type> function;
-
- private final Collector<Type> out;
-
- private Type currentValue;
-
- AggregatingTraversal(ReduceFunction<Type> function, Collector<Type> out) {
- this.function = function;
- this.out = out;
- }
-
- @Override
- public void startNewKey(Key key) {
- currentValue = null;
- }
-
- @Override
- public void nextValue(Type value) throws Exception {
- if (currentValue != null) {
- currentValue = function.reduce(currentValue, value);
- }
- else {
- currentValue = value;
- }
- }
-
- @Override
- public void keyDone() throws Exception {
- out.collect(currentValue);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java
deleted file mode 100644
index 0e07cea..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-
-public class AggregatingProcessingTimeWindowOperator<KEY, IN>
- extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, IN, IN, ReduceFunction<IN>> {
-
- private static final long serialVersionUID = 7305948082830843475L;
-
-
- public AggregatingProcessingTimeWindowOperator(
- ReduceFunction<IN> function,
- KeySelector<IN, KEY> keySelector,
- TypeSerializer<KEY> keySerializer,
- TypeSerializer<IN> aggregateSerializer,
- long windowLength,
- long windowSlide)
- {
- super(function, keySelector, keySerializer, aggregateSerializer, windowLength, windowSlide);
- }
-
- @Override
- protected AggregatingKeyedTimePanes<IN, KEY> createPanes(KeySelector<IN, KEY> keySelector, Function function) {
- @SuppressWarnings("unchecked")
- ReduceFunction<IN> windowFunction = (ReduceFunction<IN>) function;
-
- return new AggregatingKeyedTimePanes<IN, KEY>(keySelector, windowFunction);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
deleted file mode 100644
index 1bb451a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
-import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
-import org.apache.flink.streaming.api.windowing.evictors.Evictor;
-import org.apache.flink.streaming.api.windowing.triggers.Trigger;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.EvictingWindowBuffer;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Evicting window operator for non-keyed windows.
- *
- * @see org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator
- *
- * @param <IN> The type of the incoming elements.
- * @param <OUT> The type of elements emitted by the {@code WindowFunction}.
- * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns.
- */
-public class EvictingNonKeyedWindowOperator<IN, OUT, W extends Window> extends NonKeyedWindowOperator<IN, OUT, W> {
-
- private static final long serialVersionUID = 1L;
-
- private final Evictor<? super IN, ? super W> evictor;
-
- public EvictingNonKeyedWindowOperator(WindowAssigner<? super IN, W> windowAssigner,
- TypeSerializer<W> windowSerializer,
- WindowBufferFactory<? super IN, ? extends EvictingWindowBuffer<IN>> windowBufferFactory,
- AllWindowFunction<IN, OUT, W> windowFunction,
- Trigger<? super IN, ? super W> trigger,
- Evictor<? super IN, ? super W> evictor) {
- super(windowAssigner, windowSerializer, windowBufferFactory, windowFunction, trigger);
- this.evictor = requireNonNull(evictor);
- }
-
- @Override
- @SuppressWarnings("unchecked, rawtypes")
- protected void emitWindow(Context context) throws Exception {
- timestampedCollector.setTimestamp(context.window.maxTimestamp());
- EvictingWindowBuffer<IN> windowBuffer = (EvictingWindowBuffer<IN>) context.windowBuffer;
-
- int toEvict = 0;
- if (windowBuffer.size() > 0) {
- // need some type trickery here...
- toEvict = evictor.evict((Iterable) windowBuffer.getElements(), windowBuffer.size(), context.window);
- }
-
- windowBuffer.removeElements(toEvict);
-
- userFunction.apply(
- context.window,
- context.windowBuffer.getUnpackedElements(),
- timestampedCollector);
- }
-
- @Override
- public EvictingNonKeyedWindowOperator<IN, OUT, W> enableSetProcessingTime(boolean setProcessingTime) {
- super.enableSetProcessingTime(setProcessingTime);
- return this;
- }
-
-
- // ------------------------------------------------------------------------
- // Getters for testing
- // ------------------------------------------------------------------------
-
- @VisibleForTesting
- public Evictor<? super IN, ? super W> getEvictor() {
- return evictor;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
deleted file mode 100644
index ad43812..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
-import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.EvictingWindowBuffer;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
-import org.apache.flink.streaming.api.windowing.evictors.Evictor;
-import org.apache.flink.streaming.api.windowing.triggers.Trigger;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * A {@link WindowOperator} that also allows an {@link Evictor} to be used.
- *
- * <p>
- * The {@code Evictor} is used to evict elements from panes before processing a window and after
- * a {@link Trigger} has fired.
- *
- * @param <K> The type of key returned by the {@code KeySelector}.
- * @param <IN> The type of the incoming elements.
- * @param <OUT> The type of elements emitted by the {@code WindowFunction}.
- * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns.
- */
-public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends WindowOperator<K, IN, OUT, W> {
-
- private static final long serialVersionUID = 1L;
-
- private final Evictor<? super IN, ? super W> evictor;
-
- public EvictingWindowOperator(WindowAssigner<? super IN, W> windowAssigner,
- TypeSerializer<W> windowSerializer,
- KeySelector<IN, K> keySelector,
- TypeSerializer<K> keySerializer,
- WindowBufferFactory<? super IN, ? extends EvictingWindowBuffer<IN>> windowBufferFactory,
- WindowFunction<IN, OUT, K, W> windowFunction,
- Trigger<? super IN, ? super W> trigger,
- Evictor<? super IN, ? super W> evictor) {
- super(windowAssigner, windowSerializer, keySelector, keySerializer, windowBufferFactory, windowFunction, trigger);
- this.evictor = requireNonNull(evictor);
- }
-
- @Override
- @SuppressWarnings("unchecked, rawtypes")
- protected void emitWindow(Context context) throws Exception {
- timestampedCollector.setTimestamp(context.window.maxTimestamp());
- EvictingWindowBuffer<IN> windowBuffer = (EvictingWindowBuffer<IN>) context.windowBuffer;
-
- int toEvict = 0;
- if (windowBuffer.size() > 0) {
- // need some type trickery here...
- toEvict = evictor.evict((Iterable) windowBuffer.getElements(), windowBuffer.size(), context.window);
- }
-
- windowBuffer.removeElements(toEvict);
-
- userFunction.apply(context.key,
- context.window,
- context.windowBuffer.getUnpackedElements(),
- timestampedCollector);
- }
-
- @Override
- public EvictingWindowOperator<K, IN, OUT, W> enableSetProcessingTime(boolean setProcessingTime) {
- super.enableSetProcessingTime(setProcessingTime);
- return this;
- }
-
-
- // ------------------------------------------------------------------------
- // Getters for testing
- // ------------------------------------------------------------------------
-
- @VisibleForTesting
- public Evictor<? super IN, ? super W> getEvictor() {
- return evictor;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMap.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMap.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMap.java
deleted file mode 100644
index 3f44c4a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMap.java
+++ /dev/null
@@ -1,651 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.runtime.util.MathUtils;
-
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-/**
- * A special Hash Map implementation that can be traversed efficiently in sync with other
- * hash maps.
- * <p>
- * The differences between this hash map and Java's "java.util.HashMap" are:
- * <ul>
- * <li>A different hashing scheme. This implementation uses extensible hashing, meaning that
- * each hash table growth takes one more lower hash code bit into account, and values that where
- * formerly in the same bucket will afterwards be in the two adjacent buckets.</li>
- * <li>This allows an efficient traversal of multiple hash maps together, even though the maps are
- * of different sizes.</li>
- * <li>The map offers functions such as "putIfAbsent()" and "putOrAggregate()"</li>
- * <li>The map supports no removal/shrinking.</li>
- * </ul>
- */
-public class KeyMap<K, V> implements Iterable<KeyMap.Entry<K, V>> {
-
- /** The minimum table capacity, 64 entries */
- private static final int MIN_CAPACITY = 0x40;
-
- /** The maximum possible table capacity, the largest positive power of
- * two in the 32bit signed integer value range */
- private static final int MAX_CAPACITY = 0x40000000;
-
- /** The number of bits used for table addressing when table is at max capacity */
- private static final int FULL_BIT_RANGE = MathUtils.log2strict(MAX_CAPACITY);
-
- // ------------------------------------------------------------------------
-
- /** The hash index, as an array of entries */
- private Entry<K, V>[] table;
-
- /** The number of bits by which the hash code is shifted right, to find the bucket */
- private int shift;
-
- /** The number of elements in the hash table */
- private int numElements;
-
- /** The number of elements above which the hash table needs to grow */
- private int rehashThreshold;
-
- /** The base-2 logarithm of the table capacity */
- private int log2size;
-
- // ------------------------------------------------------------------------
-
- /**
- * Creates a new hash table with the default initial capacity.
- */
- public KeyMap() {
- this(0);
- }
-
- /**
- * Creates a new table with a capacity tailored to the given expected number of elements.
- *
- * @param expectedNumberOfElements The number of elements to tailor the capacity to.
- */
- public KeyMap(int expectedNumberOfElements) {
- if (expectedNumberOfElements < 0) {
- throw new IllegalArgumentException("Invalid capacity: " + expectedNumberOfElements);
- }
-
- // round up to the next power or two
- // guard against too small capacity and integer overflows
- int capacity = Integer.highestOneBit(expectedNumberOfElements) << 1;
- capacity = capacity >= 0 ? Math.max(MIN_CAPACITY, capacity) : MAX_CAPACITY;
-
- // this also acts as a sanity check
- log2size = MathUtils.log2strict(capacity);
- shift = FULL_BIT_RANGE - log2size;
- table = allocateTable(capacity);
- rehashThreshold = getRehashThreshold(capacity);
- }
-
- // ------------------------------------------------------------------------
- // Gets and Puts
- // ------------------------------------------------------------------------
-
- /**
- * Inserts the given value, mapped under the given key. If the table already contains a value for
- * the key, the value is replaced and returned. If no value is contained, yet, the function
- * returns null.
- *
- * @param key The key to insert.
- * @param value The value to insert.
- * @return The previously mapped value for the key, or null, if no value was mapped for the key.
- *
- * @throws java.lang.NullPointerException Thrown, if the key is null.
- */
- public final V put(K key, V value) {
- final int hash = hash(key);
- final int slot = indexOf (hash);
-
- // search the chain from the slot
- for (Entry<K, V> e = table[slot]; e != null; e = e.next) {
- Object k;
- if (e.hashCode == hash && ((k = e.key) == key || key.equals(k))) {
- // found match
- V old = e.value;
- e.value = value;
- return old;
- }
- }
-
- // no match, insert a new value
- insertNewEntry(hash, key, value, slot);
- return null;
- }
-
- /**
- * Inserts a value for the given key, if no value is yet contained for that key. Otherwise,
- * returns the value currently contained for the key.
- * <p>
- * The value that is inserted in case that the key is not contained, yet, is lazily created
- * using the given factory.
- *
- * @param key The key to insert.
- * @param factory The factory that produces the value, if no value is contained, yet, for the key.
- * @return The value in the map after this operation (either the previously contained value, or the
- * newly created value).
- *
- * @throws java.lang.NullPointerException Thrown, if the key is null.
- */
- public final V putIfAbsent(K key, LazyFactory<V> factory) {
- final int hash = hash(key);
- final int slot = indexOf(hash);
-
- // search the chain from the slot
- for (Entry<K, V> entry = table[slot]; entry != null; entry = entry.next) {
- if (entry.hashCode == hash && entry.key.equals(key)) {
- // found match
- return entry.value;
- }
- }
-
- // no match, insert a new value
- V value = factory.create();
- insertNewEntry(hash, key, value, slot);
-
- // return the created value
- return value;
- }
-
- /**
- * Inserts or aggregates a value into the hash map. If the hash map does not yet contain the key,
- * this method inserts the value. If the table already contains the key (and a value) this
- * method will use the given ReduceFunction function to combine the existing value and the
- * given value to a new value, and store that value for the key.
- *
- * @param key The key to map the value.
- * @param value The new value to insert, or aggregate with the existing value.
- * @param aggregator The aggregator to use if a value is already contained.
- *
- * @return The value in the map after this operation: Either the given value, or the aggregated value.
- *
- * @throws java.lang.NullPointerException Thrown, if the key is null.
- * @throws Exception The method forwards exceptions from the aggregation function.
- */
- public final V putOrAggregate(K key, V value, ReduceFunction<V> aggregator) throws Exception {
- final int hash = hash(key);
- final int slot = indexOf(hash);
-
- // search the chain from the slot
- for (Entry<K, V> entry = table[slot]; entry != null; entry = entry.next) {
- if (entry.hashCode == hash && entry.key.equals(key)) {
- // found match
- entry.value = aggregator.reduce(entry.value, value);
- return entry.value;
- }
- }
-
- // no match, insert a new value
- insertNewEntry(hash, key, value, slot);
- // return the original value
- return value;
- }
-
- /**
- * Looks up the value mapped under the given key. Returns null if no value is mapped under this key.
- *
- * @param key The key to look up.
- * @return The value associated with the key, or null, if no value is found for the key.
- *
- * @throws java.lang.NullPointerException Thrown, if the key is null.
- */
- public V get(K key) {
- final int hash = hash(key);
- final int slot = indexOf(hash);
-
- // search the chain from the slot
- for (Entry<K, V> entry = table[slot]; entry != null; entry = entry.next) {
- if (entry.hashCode == hash && entry.key.equals(key)) {
- return entry.value;
- }
- }
-
- // not found
- return null;
- }
-
- private void insertNewEntry(int hashCode, K key, V value, int position) {
- Entry<K,V> e = table[position];
- table[position] = new Entry<>(key, value, hashCode, e);
- numElements++;
-
- // rehash if necessary
- if (numElements > rehashThreshold) {
- growTable();
- }
- }
-
- private int indexOf(int hashCode) {
- return (hashCode >> shift) & (table.length - 1);
- }
-
- /**
- * Creates an iterator over the entries of this map.
- *
- * @return An iterator over the entries of this map.
- */
- @Override
- public Iterator<Entry<K, V>> iterator() {
- return new Iterator<Entry<K, V>>() {
-
- private final Entry<K, V>[] tab = KeyMap.this.table;
-
- private Entry<K, V> nextEntry;
-
- private int nextPos = 0;
-
- @Override
- public boolean hasNext() {
- if (nextEntry != null) {
- return true;
- }
- else {
- while (nextPos < tab.length) {
- Entry<K, V> e = tab[nextPos++];
- if (e != null) {
- nextEntry = e;
- return true;
- }
- }
- return false;
- }
- }
-
- @Override
- public Entry<K, V> next() {
- if (nextEntry != null || hasNext()) {
- Entry<K, V> e = nextEntry;
- nextEntry = nextEntry.next;
- return e;
- }
- else {
- throw new NoSuchElementException();
- }
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- };
- }
-
- // ------------------------------------------------------------------------
- // Properties
- // ------------------------------------------------------------------------
-
- /**
- * Gets the number of elements currently in the map.
- * @return The number of elements currently in the map.
- */
- public int size() {
- return numElements;
- }
-
- /**
- * Checks whether the map is empty.
- * @return True, if the map is empty, false otherwise.
- */
- public boolean isEmpty() {
- return numElements == 0;
- }
-
- /**
- * Gets the current table capacity, i.e., the number of slots in the hash table, without
- * and overflow chaining.
- * @return The number of slots in the hash table.
- */
- public int getCurrentTableCapacity() {
- return table.length;
- }
-
- /**
- * Gets the base-2 logarithm of the hash table capacity, as returned by
- * {@link #getCurrentTableCapacity()}.
- *
- * @return The base-2 logarithm of the hash table capacity.
- */
- public int getLog2TableCapacity() {
- return log2size;
- }
-
- public int getRehashThreshold() {
- return rehashThreshold;
- }
-
- public int getShift() {
- return shift;
- }
-
- // ------------------------------------------------------------------------
- // Utilities
- // ------------------------------------------------------------------------
-
- @SuppressWarnings("unchecked")
- private Entry<K, V>[] allocateTable(int numElements) {
- return (Entry<K, V>[]) new Entry<?, ?>[numElements];
- }
-
- private void growTable() {
- final int newSize = table.length << 1;
-
- // only grow if there is still space to grow the table
- if (newSize > 0) {
- final Entry<K, V>[] oldTable = this.table;
- final Entry<K, V>[] newTable = allocateTable(newSize);
-
- final int newShift = shift - 1;
- final int newMask = newSize - 1;
-
- // go over all slots from the table. since we hash to adjacent positions in
- // the new hash table, this is actually cache efficient
- for (Entry<K, V> entry : oldTable) {
- // traverse the chain for each slot
- while (entry != null) {
- final int newPos = (entry.hashCode >> newShift) & newMask;
- Entry<K, V> nextEntry = entry.next;
- entry.next = newTable[newPos];
- newTable[newPos] = entry;
- entry = nextEntry;
- }
- }
-
- this.table = newTable;
- this.shift = newShift;
- this.rehashThreshold = getRehashThreshold(newSize);
- this.log2size += 1;
- }
- }
-
- private static int hash(Object key) {
- int code = key.hashCode();
-
- // we need a strong hash function that generates diverse upper bits
- // this hash function is more expensive than the "scramble" used by "java.util.HashMap",
- // but required for this sort of hash table
- code = (code + 0x7ed55d16) + (code << 12);
- code = (code ^ 0xc761c23c) ^ (code >>> 19);
- code = (code + 0x165667b1) + (code << 5);
- code = (code + 0xd3a2646c) ^ (code << 9);
- code = (code + 0xfd7046c5) + (code << 3);
- return (code ^ 0xb55a4f09) ^ (code >>> 16);
- }
-
- private static int getRehashThreshold(int capacity) {
- // divide before multiply, to avoid overflow
- return capacity / 4 * 3;
- }
-
- // ------------------------------------------------------------------------
- // Testing Utilities
- // ------------------------------------------------------------------------
-
- /**
- * For testing only: Actively counts the number of entries, rather than using the
- * counter variable. This method has linear complexity, rather than constant.
- *
- * @return The counted number of entries.
- */
- int traverseAndCountElements() {
- int num = 0;
-
- for (Entry<?, ?> entry : table) {
- while (entry != null) {
- num++;
- entry = entry.next;
- }
- }
-
- return num;
- }
-
- /**
- * For testing only: Gets the length of the longest overflow chain.
- * This method has linear complexity.
- *
- * @return The length of the longest overflow chain.
- */
- int getLongestChainLength() {
- int maxLen = 0;
-
- for (Entry<?, ?> entry : table) {
- int thisLen = 0;
- while (entry != null) {
- thisLen++;
- entry = entry.next;
- }
- maxLen = Math.max(maxLen, thisLen);
- }
-
- return maxLen;
- }
-
- // ------------------------------------------------------------------------
-
- /**
- * An entry in the hash table.
- *
- * @param <K> Type of the key.
- * @param <V> Type of the value.
- */
- public static final class Entry<K, V> {
-
- final K key;
- final int hashCode;
-
- V value;
- Entry<K, V> next;
- long touchedTag;
-
- Entry(K key, V value, int hashCode, Entry<K, V> next) {
- this.key = key;
- this.value = value;
- this.next = next;
- this.hashCode = hashCode;
- }
-
- public K getKey() {
- return key;
- }
-
- public V getValue() {
- return value;
- }
- }
-
- // ------------------------------------------------------------------------
-
- /**
- * Performs a traversal about logical the multi-map that results from the union of the
- * given maps. This method does not actually build a union of the map, but traverses the hash maps
- * together.
- *
- * @param maps The array uf maps whose union should be traversed.
- * @param visitor The visitor that is called for each key and all values.
- * @param touchedTag A tag that is used to mark elements that have been touched in this specific
- * traversal. Each successive traversal should supply a larger value for this
- * tag than the previous one.
- *
- * @param <K> The type of the map's key.
- * @param <V> The type of the map's value.
- */
- public static <K, V> void traverseMaps(
- final KeyMap<K, V>[] maps,
- final TraversalEvaluator<K, V> visitor,
- final long touchedTag)
- throws Exception
- {
- // we need to work on the maps in descending size
- Arrays.sort(maps, CapacityDescendingComparator.INSTANCE);
-
- final int[] shifts = new int[maps.length];
- final int[] lowBitsMask = new int[maps.length];
- final int numSlots = maps[0].table.length;
- final int numTables = maps.length;
-
- // figure out how much each hash table collapses the entries
- for (int i = 0; i < numTables; i++) {
- shifts[i] = maps[0].log2size - maps[i].log2size;
- lowBitsMask[i] = (1 << shifts[i]) - 1;
- }
-
- // go over all slots (based on the largest hash table)
- for (int pos = 0; pos < numSlots; pos++) {
-
- // for each slot, go over all tables, until the table does not have that slot any more
- // for tables where multiple slots collapse into one, we visit that one when we process the
- // latest of all slots that collapse to that one
- int mask;
- for (int rootTable = 0;
- rootTable < numTables && ((mask = lowBitsMask[rootTable]) & pos) == mask;
- rootTable++)
- {
- // use that table to gather keys and start collecting keys from the following tables
- // go over all entries of that slot in the table
- Entry<K, V> entry = maps[rootTable].table[pos >> shifts[rootTable]];
- while (entry != null) {
- // take only entries that have not been collected as part of other tables
- if (entry.touchedTag < touchedTag) {
- entry.touchedTag = touchedTag;
-
- final K key = entry.key;
- final int hashCode = entry.hashCode;
- visitor.startNewKey(key);
- visitor.nextValue(entry.value);
-
- addEntriesFromChain(entry.next, visitor, key, touchedTag, hashCode);
-
- // go over the other hash tables and collect their entries for the key
- for (int followupTable = rootTable + 1; followupTable < numTables; followupTable++) {
- Entry<K, V> followupEntry = maps[followupTable].table[pos >> shifts[followupTable]];
- if (followupEntry != null) {
- addEntriesFromChain(followupEntry, visitor, key, touchedTag, hashCode);
- }
- }
-
- visitor.keyDone();
- }
-
- entry = entry.next;
- }
- }
- }
- }
-
- private static <K, V> void addEntriesFromChain(
- Entry<K, V> entry,
- TraversalEvaluator<K, V> visitor,
- K key,
- long touchedTag,
- int hashCode) throws Exception
- {
- while (entry != null) {
- if (entry.touchedTag < touchedTag && entry.hashCode == hashCode && entry.key.equals(key)) {
- entry.touchedTag = touchedTag;
- visitor.nextValue(entry.value);
- }
- entry = entry.next;
- }
- }
-
- // ------------------------------------------------------------------------
-
- /**
- * Comparator that defines a descending order on maps depending on their table capacity
- * and number of elements.
- */
- static final class CapacityDescendingComparator implements Comparator<KeyMap<?, ?>> {
-
- static final CapacityDescendingComparator INSTANCE = new CapacityDescendingComparator();
-
- private CapacityDescendingComparator() {}
-
-
- @Override
- public int compare(KeyMap<?, ?> o1, KeyMap<?, ?> o2) {
- // this sorts descending
- int cmp = o2.getLog2TableCapacity() - o1.getLog2TableCapacity();
- if (cmp != 0) {
- return cmp;
- }
- else {
- return o2.size() - o1.size();
- }
- }
- }
-
- // ------------------------------------------------------------------------
-
- /**
- * A factory for lazy/on-demand instantiation of values.
- *
- * @param <V> The type created by the factory.
- */
- public static interface LazyFactory<V> {
-
- /**
- * The factory method; creates the value.
- * @return The value.
- */
- V create();
- }
-
- // ------------------------------------------------------------------------
-
- /**
- * A visitor for a traversal over the union of multiple hash maps. The visitor is
- * called for each key in the union of the maps and all values associated with that key
- * (one per map, but multiple across maps).
- *
- * @param <K> The type of the key.
- * @param <V> The type of the value.
- */
- public static interface TraversalEvaluator<K, V> {
-
- /**
- * Called whenever the traversal starts with a new key.
- *
- * @param key The key traversed.
- * @throws Exception Method forwards all exceptions.
- */
- void startNewKey(K key) throws Exception;
-
- /**
- * Called for each value found for the current key.
- *
- * @param value The next value.
- * @throws Exception Method forwards all exceptions.
- */
- void nextValue(V value) throws Exception;
-
- /**
- * Called when the traversal for the current key is complete.
- *
- * @throws Exception Method forwards all exceptions.
- */
- void keyDone() throws Exception;
- }
-}