You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:03:47 UTC
[31/51] [partial] flink git commit: [FLINK-2877] Move Streaming API
out of Staging package
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
deleted file mode 100644
index 5de6cd1..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
+++ /dev/null
@@ -1,539 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.lang.SerializationUtils;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.runtime.state.StateBackend;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
-import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
-import org.apache.flink.streaming.api.operators.TimestampedCollector;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
-import org.apache.flink.streaming.api.windowing.triggers.Trigger;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBuffer;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
-import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Window operator for non-keyed windows.
- *
- * @see org.apache.flink.streaming.runtime.operators.windowing.WindowOperator
- *
- * @param <IN> The type of the incoming elements.
- * @param <OUT> The type of elements emitted by the {@code WindowFunction}.
- * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns.
- */
-public class NonKeyedWindowOperator<IN, OUT, W extends Window>
- extends AbstractUdfStreamOperator<OUT, AllWindowFunction<IN, OUT, W>>
- implements OneInputStreamOperator<IN, OUT>, Triggerable, InputTypeConfigurable, OutputTypeConfigurable<OUT> {
-
- private static final long serialVersionUID = 1L;
-
- private static final Logger LOG = LoggerFactory.getLogger(WindowOperator.class);
-
- // ------------------------------------------------------------------------
- // Configuration values and stuff from the user
- // ------------------------------------------------------------------------
-
- private final WindowAssigner<? super IN, W> windowAssigner;
-
- private final Trigger<? super IN, ? super W> trigger;
-
- private final WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> windowBufferFactory;
-
- /**
- * If this is true. The current processing time is set as the timestamp of incoming elements.
- * This for use with a {@link org.apache.flink.streaming.api.windowing.evictors.TimeEvictor}
- * if eviction should happen based on processing time.
- */
- private boolean setProcessingTime = false;
-
- /**
- * This is used to copy the incoming element because it can be put into several window
- * buffers.
- */
- private TypeSerializer<IN> inputSerializer;
-
- /**
- * For serializing the window in checkpoints.
- */
- private final TypeSerializer<W> windowSerializer;
-
- // ------------------------------------------------------------------------
- // State that is not checkpointed
- // ------------------------------------------------------------------------
-
- /**
- * Processing time timers that are currently in-flight.
- */
- private transient Map<Long, Set<Context>> processingTimeTimers;
-
- /**
- * Current waiting watermark callbacks.
- */
- private transient Map<Long, Set<Context>> watermarkTimers;
-
- /**
- * This is given to the {@code WindowFunction} for emitting elements with a given timestamp.
- */
- protected transient TimestampedCollector<OUT> timestampedCollector;
-
- // ------------------------------------------------------------------------
- // State that needs to be checkpointed
- // ------------------------------------------------------------------------
-
- /**
- * The windows (panes) that are currently in-flight. Each pane has a {@code WindowBuffer}
- * and a {@code TriggerContext} that stores the {@code Trigger} for that pane.
- */
- protected transient Map<W, Context> windows;
-
- /**
- * Creates a new {@code WindowOperator} based on the given policies and user functions.
- */
- public NonKeyedWindowOperator(WindowAssigner<? super IN, W> windowAssigner,
- TypeSerializer<W> windowSerializer,
- WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> windowBufferFactory,
- AllWindowFunction<IN, OUT, W> windowFunction,
- Trigger<? super IN, ? super W> trigger) {
-
- super(windowFunction);
-
- this.windowAssigner = requireNonNull(windowAssigner);
- this.windowSerializer = windowSerializer;
-
- this.windowBufferFactory = requireNonNull(windowBufferFactory);
- this.trigger = requireNonNull(trigger);
-
- setChainingStrategy(ChainingStrategy.ALWAYS);
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public final void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) {
- inputSerializer = (TypeSerializer<IN>) type.createSerializer(executionConfig);
- }
-
- @Override
- public final void open() throws Exception {
- super.open();
- timestampedCollector = new TimestampedCollector<>(output);
-
- if (inputSerializer == null) {
- throw new IllegalStateException("Input serializer was not set.");
- }
-
- windowBufferFactory.setRuntimeContext(getRuntimeContext());
- windowBufferFactory.open(getUserFunctionParameters());
-
- // these could already be initialized from restoreState()
- if (watermarkTimers == null) {
- watermarkTimers = new HashMap<>();
- }
- if (processingTimeTimers == null) {
- processingTimeTimers = new HashMap<>();
- }
- if (windows == null) {
- windows = new HashMap<>();
- }
-
- // re-register timers that this window context had set
- for (Context context: windows.values()) {
- if (context.processingTimeTimer > 0) {
- Set<Context> triggers = processingTimeTimers.get(context.processingTimeTimer);
- if (triggers == null) {
- getRuntimeContext().registerTimer(context.processingTimeTimer, NonKeyedWindowOperator.this);
- triggers = new HashSet<>();
- processingTimeTimers.put(context.processingTimeTimer, triggers);
- }
- triggers.add(context);
- }
- if (context.watermarkTimer > 0) {
- Set<Context> triggers = watermarkTimers.get(context.watermarkTimer);
- if (triggers == null) {
- triggers = new HashSet<>();
- watermarkTimers.put(context.watermarkTimer, triggers);
- }
- triggers.add(context);
- }
-
- }
- }
-
- @Override
- public final void close() throws Exception {
- super.close();
- // emit the elements that we still keep
- for (Context window: windows.values()) {
- emitWindow(window);
- }
- windows.clear();
- windowBufferFactory.close();
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public final void processElement(StreamRecord<IN> element) throws Exception {
- if (setProcessingTime) {
- element.replace(element.getValue(), System.currentTimeMillis());
- }
-
- Collection<W> elementWindows = windowAssigner.assignWindows(element.getValue(), element.getTimestamp());
-
- for (W window: elementWindows) {
- Context context = windows.get(window);
- if (context == null) {
- WindowBuffer<IN> windowBuffer = windowBufferFactory.create();
- context = new Context(window, windowBuffer);
- windows.put(window, context);
- }
- StreamRecord<IN> elementCopy = new StreamRecord<>(inputSerializer.copy(element.getValue()), element.getTimestamp());
- context.windowBuffer.storeElement(elementCopy);
- Trigger.TriggerResult triggerResult = trigger.onElement(elementCopy.getValue(), elementCopy.getTimestamp(), window, context);
- processTriggerResult(triggerResult, window);
- }
- }
-
- protected void emitWindow(Context context) throws Exception {
- timestampedCollector.setTimestamp(context.window.maxTimestamp());
-
- userFunction.apply(
- context.window,
- context.windowBuffer.getUnpackedElements(),
- timestampedCollector);
- }
-
- private void processTriggerResult(Trigger.TriggerResult triggerResult, W window) throws Exception {
- switch (triggerResult) {
- case FIRE: {
- Context context = windows.get(window);
- if (context == null) {
- LOG.debug("Window {} already gone.", window);
- return;
- }
-
-
- emitWindow(context);
- break;
- }
-
- case FIRE_AND_PURGE: {
- Context context = windows.remove(window);
- if (context == null) {
- LOG.debug("Window {} already gone.", window);
- return;
- }
-
- emitWindow(context);
- break;
- }
-
- case CONTINUE:
- // ingore
- }
- }
-
- @Override
- public final void processWatermark(Watermark mark) throws Exception {
- Set<Long> toRemove = new HashSet<>();
-
- for (Map.Entry<Long, Set<Context>> triggers: watermarkTimers.entrySet()) {
- if (triggers.getKey() <= mark.getTimestamp()) {
- for (Context context: triggers.getValue()) {
- Trigger.TriggerResult triggerResult = context.onEventTime(triggers.getKey());
- processTriggerResult(triggerResult, context.window);
- }
- toRemove.add(triggers.getKey());
- }
- }
-
- for (Long l: toRemove) {
- watermarkTimers.remove(l);
- }
- output.emitWatermark(mark);
- }
-
- @Override
- public final void trigger(long time) throws Exception {
- Set<Long> toRemove = new HashSet<>();
-
- for (Map.Entry<Long, Set<Context>> triggers: processingTimeTimers.entrySet()) {
- if (triggers.getKey() < time) {
- for (Context context: triggers.getValue()) {
- Trigger.TriggerResult triggerResult = context.onProcessingTime(time);
- processTriggerResult(triggerResult, context.window);
- }
- toRemove.add(triggers.getKey());
- }
- }
-
- for (Long l: toRemove) {
- processingTimeTimers.remove(l);
- }
- }
-
- /**
- * A context object that is given to {@code Trigger} functions to allow them to register
- * timer/watermark callbacks.
- */
- protected class Context implements Trigger.TriggerContext {
- protected W window;
-
- protected WindowBuffer<IN> windowBuffer;
-
- protected HashMap<String, Serializable> state;
-
- // use these to only allow one timer in flight at a time of each type
- // if the trigger registers another timer this value here will be overwritten,
- // the timer is not removed from the set of in-flight timers to improve performance.
- // When a trigger fires it is just checked against the last timer that was set.
- protected long watermarkTimer;
- protected long processingTimeTimer;
-
- public Context(
- W window,
- WindowBuffer<IN> windowBuffer) {
- this.window = window;
- this.windowBuffer = windowBuffer;
- state = new HashMap<>();
-
- this.watermarkTimer = -1;
- this.processingTimeTimer = -1;
- }
-
-
- @SuppressWarnings("unchecked")
- protected Context(DataInputView in) throws Exception {
- this.window = windowSerializer.deserialize(in);
- this.watermarkTimer = in.readLong();
- this.processingTimeTimer = in.readLong();
-
- int stateSize = in.readInt();
- byte[] stateData = new byte[stateSize];
- in.read(stateData);
- ByteArrayInputStream bais = new ByteArrayInputStream(stateData);
- state = (HashMap<String, Serializable>) SerializationUtils.deserialize(bais);
-
- this.windowBuffer = windowBufferFactory.create();
- int numElements = in.readInt();
- MultiplexingStreamRecordSerializer<IN> recordSerializer = new MultiplexingStreamRecordSerializer<>(inputSerializer);
- for (int i = 0; i < numElements; i++) {
- windowBuffer.storeElement(recordSerializer.deserialize(in).<IN>asRecord());
- }
- }
-
- protected void writeToState(StateBackend.CheckpointStateOutputView out) throws IOException {
- windowSerializer.serialize(window, out);
- out.writeLong(watermarkTimer);
- out.writeLong(processingTimeTimer);
-
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- SerializationUtils.serialize(state, baos);
- out.writeInt(baos.size());
- out.write(baos.toByteArray(), 0, baos.size());
-
- MultiplexingStreamRecordSerializer<IN> recordSerializer = new MultiplexingStreamRecordSerializer<>(inputSerializer);
- out.writeInt(windowBuffer.size());
- for (StreamRecord<IN> element: windowBuffer.getElements()) {
- recordSerializer.serialize(element, out);
- }
- }
-
- @SuppressWarnings("unchecked")
- public <S extends Serializable> OperatorState<S> getKeyValueState(final String name, final S defaultState) {
- return new OperatorState<S>() {
- @Override
- public S value() throws IOException {
- Serializable value = state.get(name);
- if (value == null) {
- state.put(name, defaultState);
- value = defaultState;
- }
- return (S) value;
- }
-
- @Override
- public void update(S value) throws IOException {
- state.put(name, value);
- }
- };
- }
-
- @Override
- public void registerProcessingTimeTimer(long time) {
- if (this.processingTimeTimer == time) {
- // we already have set a trigger for that time
- return;
- }
- Set<Context> triggers = processingTimeTimers.get(time);
- if (triggers == null) {
- getRuntimeContext().registerTimer(time, NonKeyedWindowOperator.this);
- triggers = new HashSet<>();
- processingTimeTimers.put(time, triggers);
- }
- this.processingTimeTimer = time;
- triggers.add(this);
- }
-
- @Override
- public void registerEventTimeTimer(long time) {
- if (watermarkTimer == time) {
- // we already have set a trigger for that time
- return;
- }
- Set<Context> triggers = watermarkTimers.get(time);
- if (triggers == null) {
- triggers = new HashSet<>();
- watermarkTimers.put(time, triggers);
- }
- this.watermarkTimer = time;
- triggers.add(this);
- }
-
- public Trigger.TriggerResult onProcessingTime(long time) throws Exception {
- if (time == processingTimeTimer) {
- return trigger.onProcessingTime(time, this);
- } else {
- return Trigger.TriggerResult.CONTINUE;
- }
- }
-
- public Trigger.TriggerResult onEventTime(long time) throws Exception {
- if (time == watermarkTimer) {
- return trigger.onEventTime(time, this);
- } else {
- return Trigger.TriggerResult.CONTINUE;
- }
- }
- }
-
- /**
- * When this flag is enabled the current processing time is set as the timestamp of elements
- * upon arrival. This must be used, for example, when using the
- * {@link org.apache.flink.streaming.api.windowing.evictors.TimeEvictor} with processing
- * time semantics.
- */
- public NonKeyedWindowOperator<IN, OUT, W> enableSetProcessingTime(boolean setProcessingTime) {
- this.setProcessingTime = setProcessingTime;
- return this;
- }
-
- @Override
- public final void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig executionConfig) {
- if (userFunction instanceof OutputTypeConfigurable) {
- @SuppressWarnings("unchecked")
- OutputTypeConfigurable<OUT> typeConfigurable = (OutputTypeConfigurable<OUT>) userFunction;
- typeConfigurable.setOutputType(outTypeInfo, executionConfig);
- }
- }
-
- // ------------------------------------------------------------------------
- // Checkpointing
- // ------------------------------------------------------------------------
-
- @Override
- public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
- StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
-
- // we write the panes with the key/value maps into the stream
- StateBackend.CheckpointStateOutputView out = getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp);
-
- int numWindows = windows.size();
- out.writeInt(numWindows);
- for (Context context: windows.values()) {
- context.writeToState(out);
- }
-
- taskState.setOperatorState(out.closeAndGetHandle());
- return taskState;
- }
-
- @Override
- public void restoreState(StreamTaskState taskState) throws Exception {
- super.restoreState(taskState);
-
-
- @SuppressWarnings("unchecked")
- StateHandle<DataInputView> inputState = (StateHandle<DataInputView>) taskState.getOperatorState();
- DataInputView in = inputState.getState(getUserCodeClassloader());
-
- int numWindows = in.readInt();
- this.windows = new HashMap<>(numWindows);
- this.processingTimeTimers = new HashMap<>();
- this.watermarkTimers = new HashMap<>();
-
- for (int j = 0; j < numWindows; j++) {
- Context context = new Context(in);
- windows.put(context.window, context);
- }
- }
-
-
- // ------------------------------------------------------------------------
- // Getters for testing
- // ------------------------------------------------------------------------
-
- @VisibleForTesting
- public boolean isSetProcessingTime() {
- return setProcessingTime;
- }
-
- @VisibleForTesting
- public Trigger<? super IN, ? super W> getTrigger() {
- return trigger;
- }
-
- @VisibleForTesting
- public WindowAssigner<? super IN, W> getWindowAssigner() {
- return windowAssigner;
- }
-
- @VisibleForTesting
- public WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> getWindowBufferFactory() {
- return windowBufferFactory;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
deleted file mode 100644
index 2491c57..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ /dev/null
@@ -1,625 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.lang.SerializationUtils;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.runtime.state.StateBackend;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
-import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
-import org.apache.flink.streaming.api.operators.TimestampedCollector;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
-import org.apache.flink.streaming.api.windowing.triggers.Trigger;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBuffer;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
-import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * An operator that implements the logic for windowing based on a {@link WindowAssigner} and
- * {@link Trigger}.
- *
- * <p>
- * When an element arrives it gets assigned a key using a {@link KeySelector} and it get's
- * assigned to zero or more windows using a {@link WindowAssigner}. Based on this the element
- * is put into panes. A pane is the bucket of elements that have the same key and same
- * {@code Window}. An element can be in multiple panes of it was assigned to multiple windows by the
- * {@code WindowAssigner}.
- *
- * <p>
- * Each pane gets its own instance of the provided {@code Trigger}. This trigger determines when
- * the contents of the pane should be processed to emit results. When a trigger fires,
- * the given {@link WindowFunction} is invoked to produce the results that are emitted for
- * the pane to which the {@code Trigger} belongs.
- *
- * <p>
- * This operator also needs a {@link WindowBufferFactory} to create a buffer for storing the
- * elements of each pane.
- *
- * @param <K> The type of key returned by the {@code KeySelector}.
- * @param <IN> The type of the incoming elements.
- * @param <OUT> The type of elements emitted by the {@code WindowFunction}.
- * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns.
- */
-public class WindowOperator<K, IN, OUT, W extends Window>
- extends AbstractUdfStreamOperator<OUT, WindowFunction<IN, OUT, K, W>>
- implements OneInputStreamOperator<IN, OUT>, Triggerable, InputTypeConfigurable, OutputTypeConfigurable<OUT> {
-
- private static final long serialVersionUID = 1L;
-
- private static final Logger LOG = LoggerFactory.getLogger(WindowOperator.class);
-
- // ------------------------------------------------------------------------
- // Configuration values and user functions
- // ------------------------------------------------------------------------
-
- private final WindowAssigner<? super IN, W> windowAssigner;
-
- private final KeySelector<IN, K> keySelector;
-
- private final Trigger<? super IN, ? super W> trigger;
-
- private final WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> windowBufferFactory;
-
- /**
- * If this is true. The current processing time is set as the timestamp of incoming elements.
- * This for use with a {@link org.apache.flink.streaming.api.windowing.evictors.TimeEvictor}
- * if eviction should happen based on processing time.
- */
- private boolean setProcessingTime = false;
-
- /**
- * This is used to copy the incoming element because it can be put into several window
- * buffers.
- */
- private TypeSerializer<IN> inputSerializer;
-
- /**
- * For serializing the key in checkpoints.
- */
- private final TypeSerializer<K> keySerializer;
-
- /**
- * For serializing the window in checkpoints.
- */
- private final TypeSerializer<W> windowSerializer;
-
- // ------------------------------------------------------------------------
- // State that is not checkpointed
- // ------------------------------------------------------------------------
-
- /**
- * Processing time timers that are currently in-flight.
- */
- private transient Map<Long, Set<Context>> processingTimeTimers;
-
- /**
- * Current waiting watermark callbacks.
- */
- private transient Map<Long, Set<Context>> watermarkTimers;
-
- /**
- * This is given to the {@code WindowFunction} for emitting elements with a given timestamp.
- */
- protected transient TimestampedCollector<OUT> timestampedCollector;
-
- // ------------------------------------------------------------------------
- // State that needs to be checkpointed
- // ------------------------------------------------------------------------
-
- /**
- * The windows (panes) that are currently in-flight. Each pane has a {@code WindowBuffer}
- * and a {@code TriggerContext} that stores the {@code Trigger} for that pane.
- */
- protected transient Map<K, Map<W, Context>> windows;
-
- /**
- * Creates a new {@code WindowOperator} based on the given policies and user functions.
- */
- public WindowOperator(WindowAssigner<? super IN, W> windowAssigner,
- TypeSerializer<W> windowSerializer,
- KeySelector<IN, K> keySelector,
- TypeSerializer<K> keySerializer,
- WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> windowBufferFactory,
- WindowFunction<IN, OUT, K, W> windowFunction,
- Trigger<? super IN, ? super W> trigger) {
-
- super(windowFunction);
-
- this.windowAssigner = requireNonNull(windowAssigner);
- this.windowSerializer = windowSerializer;
- this.keySelector = requireNonNull(keySelector);
- this.keySerializer = requireNonNull(keySerializer);
-
- this.windowBufferFactory = requireNonNull(windowBufferFactory);
- this.trigger = requireNonNull(trigger);
-
- setChainingStrategy(ChainingStrategy.ALWAYS);
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public final void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) {
- inputSerializer = (TypeSerializer<IN>) type.createSerializer(executionConfig);
- }
-
- @Override
- public final void open() throws Exception {
- super.open();
-
- timestampedCollector = new TimestampedCollector<>(output);
-
- if (inputSerializer == null) {
- throw new IllegalStateException("Input serializer was not set.");
- }
-
- windowBufferFactory.setRuntimeContext(getRuntimeContext());
- windowBufferFactory.open(getUserFunctionParameters());
-
-
- // these could already be initialized from restoreState()
- if (watermarkTimers == null) {
- watermarkTimers = new HashMap<>();
- }
- if (processingTimeTimers == null) {
- processingTimeTimers = new HashMap<>();
- }
- if (windows == null) {
- windows = new HashMap<>();
- }
-
- // re-register timers that this window context had set
- for (Map.Entry<K, Map<W, Context>> entry: windows.entrySet()) {
- Map<W, Context> keyWindows = entry.getValue();
- for (Context context: keyWindows.values()) {
- if (context.processingTimeTimer > 0) {
- Set<Context> triggers = processingTimeTimers.get(context.processingTimeTimer);
- if (triggers == null) {
- getRuntimeContext().registerTimer(context.processingTimeTimer, WindowOperator.this);
- triggers = new HashSet<>();
- processingTimeTimers.put(context.processingTimeTimer, triggers);
- }
- triggers.add(context);
- }
- if (context.watermarkTimer > 0) {
- Set<Context> triggers = watermarkTimers.get(context.watermarkTimer);
- if (triggers == null) {
- triggers = new HashSet<>();
- watermarkTimers.put(context.watermarkTimer, triggers);
- }
- triggers.add(context);
- }
-
- }
- }
- }
-
- @Override
- public final void close() throws Exception {
- super.close();
- // emit the elements that we still keep
- for (Map.Entry<K, Map<W, Context>> entry: windows.entrySet()) {
- Map<W, Context> keyWindows = entry.getValue();
- for (Context window: keyWindows.values()) {
- emitWindow(window);
- }
- }
- windows.clear();
- windowBufferFactory.close();
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public final void processElement(StreamRecord<IN> element) throws Exception {
- if (setProcessingTime) {
- element.replace(element.getValue(), System.currentTimeMillis());
- }
-
- Collection<W> elementWindows = windowAssigner.assignWindows(element.getValue(), element.getTimestamp());
-
- K key = keySelector.getKey(element.getValue());
-
- Map<W, Context> keyWindows = windows.get(key);
- if (keyWindows == null) {
- keyWindows = new HashMap<>();
- windows.put(key, keyWindows);
- }
-
- for (W window: elementWindows) {
- Context context = keyWindows.get(window);
- if (context == null) {
- WindowBuffer<IN> windowBuffer = windowBufferFactory.create();
- context = new Context(key, window, windowBuffer);
- keyWindows.put(window, context);
- }
- StreamRecord<IN> elementCopy = new StreamRecord<>(inputSerializer.copy(element.getValue()), element.getTimestamp());
- context.windowBuffer.storeElement(elementCopy);
- Trigger.TriggerResult triggerResult = trigger.onElement(elementCopy.getValue(), elementCopy.getTimestamp(), window, context);
- processTriggerResult(triggerResult, key, window);
- }
- }
-
- protected void emitWindow(Context context) throws Exception {
- timestampedCollector.setTimestamp(context.window.maxTimestamp());
-
- userFunction.apply(context.key,
- context.window,
- context.windowBuffer.getUnpackedElements(),
- timestampedCollector);
- }
-
- private void processTriggerResult(Trigger.TriggerResult triggerResult, K key, W window) throws Exception {
- switch (triggerResult) {
- case FIRE: {
- Map<W, Context> keyWindows = windows.get(key);
- if (keyWindows == null) {
- LOG.debug("Window {} for key {} already gone.", window, key);
- return;
- }
- Context context = keyWindows.get(window);
- if (context == null) {
- LOG.debug("Window {} for key {} already gone.", window, key);
- return;
- }
-
-
- emitWindow(context);
- break;
- }
-
- case FIRE_AND_PURGE: {
- Map<W, Context> keyWindows = windows.get(key);
- if (keyWindows == null) {
- LOG.debug("Window {} for key {} already gone.", window, key);
- return;
- }
- Context context = keyWindows.remove(window);
- if (context == null) {
- LOG.debug("Window {} for key {} already gone.", window, key);
- return;
- }
- if (keyWindows.isEmpty()) {
- windows.remove(key);
- }
-
- emitWindow(context);
- break;
- }
-
- case CONTINUE:
- // ingore
- }
- }
-
- @Override
- public final void processWatermark(Watermark mark) throws Exception {
- Set<Long> toRemove = new HashSet<>();
-
- for (Map.Entry<Long, Set<Context>> triggers: watermarkTimers.entrySet()) {
- if (triggers.getKey() <= mark.getTimestamp()) {
- for (Context context: triggers.getValue()) {
- Trigger.TriggerResult triggerResult = context.onEventTime(triggers.getKey());
- processTriggerResult(triggerResult, context.key, context.window);
- }
- toRemove.add(triggers.getKey());
- }
- }
-
- for (Long l: toRemove) {
- watermarkTimers.remove(l);
- }
- output.emitWatermark(mark);
- }
-
- @Override
- public final void trigger(long time) throws Exception {
- Set<Long> toRemove = new HashSet<>();
-
- for (Map.Entry<Long, Set<Context>> triggers: processingTimeTimers.entrySet()) {
- if (triggers.getKey() < time) {
- for (Context context: triggers.getValue()) {
- Trigger.TriggerResult triggerResult = context.onProcessingTime(time);
- processTriggerResult(triggerResult, context.key, context.window);
- }
- toRemove.add(triggers.getKey());
- }
- }
-
- for (Long l: toRemove) {
- processingTimeTimers.remove(l);
- }
- }
-
- /**
- * A context object that is given to {@code Trigger} functions to allow them to register
- * timer/watermark callbacks.
- */
- protected class Context implements Trigger.TriggerContext {
- protected K key;
- protected W window;
-
- protected WindowBuffer<IN> windowBuffer;
-
- protected HashMap<String, Serializable> state;
-
- // use these to only allow one timer in flight at a time of each type
- // if the trigger registers another timer this value here will be overwritten,
- // the timer is not removed from the set of in-flight timers to improve performance.
- // When a trigger fires it is just checked against the last timer that was set.
- protected long watermarkTimer;
- protected long processingTimeTimer;
-
- public Context(K key,
- W window,
- WindowBuffer<IN> windowBuffer) {
- this.key = key;
- this.window = window;
- this.windowBuffer = windowBuffer;
- state = new HashMap<>();
-
- this.watermarkTimer = -1;
- this.processingTimeTimer = -1;
- }
-
- /**
- * Constructs a new {@code Context} by reading from a {@link DataInputView} that
- * contains a serialized context that we wrote in
- * {@link #writeToState(StateBackend.CheckpointStateOutputView)}
- */
- @SuppressWarnings("unchecked")
- protected Context(DataInputView in) throws Exception {
- this.key = keySerializer.deserialize(in);
- this.window = windowSerializer.deserialize(in);
- this.watermarkTimer = in.readLong();
- this.processingTimeTimer = in.readLong();
-
- int stateSize = in.readInt();
- byte[] stateData = new byte[stateSize];
- in.read(stateData);
- ByteArrayInputStream bais = new ByteArrayInputStream(stateData);
- state = (HashMap<String, Serializable>) SerializationUtils.deserialize(bais);
-
- this.windowBuffer = windowBufferFactory.create();
- int numElements = in.readInt();
- MultiplexingStreamRecordSerializer<IN> recordSerializer = new MultiplexingStreamRecordSerializer<>(inputSerializer);
- for (int i = 0; i < numElements; i++) {
- windowBuffer.storeElement(recordSerializer.deserialize(in).<IN>asRecord());
- }
- }
-
- /**
- * Writes the {@code Context} to the given state checkpoint output.
- */
- protected void writeToState(StateBackend.CheckpointStateOutputView out) throws IOException {
- keySerializer.serialize(key, out);
- windowSerializer.serialize(window, out);
- out.writeLong(watermarkTimer);
- out.writeLong(processingTimeTimer);
-
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- SerializationUtils.serialize(state, baos);
- out.writeInt(baos.size());
- out.write(baos.toByteArray(), 0, baos.size());
-
- MultiplexingStreamRecordSerializer<IN> recordSerializer = new MultiplexingStreamRecordSerializer<>(inputSerializer);
- out.writeInt(windowBuffer.size());
- for (StreamRecord<IN> element: windowBuffer.getElements()) {
- recordSerializer.serialize(element, out);
- }
- }
-
- @SuppressWarnings("unchecked")
- public <S extends Serializable> OperatorState<S> getKeyValueState(final String name, final S defaultState) {
- return new OperatorState<S>() {
- @Override
- public S value() throws IOException {
- Serializable value = state.get(name);
- if (value == null) {
- state.put(name, defaultState);
- value = defaultState;
- }
- return (S) value;
- }
-
- @Override
- public void update(S value) throws IOException {
- state.put(name, value);
- }
- };
- }
-
- @Override
- public void registerProcessingTimeTimer(long time) {
- if (this.processingTimeTimer == time) {
- // we already have set a trigger for that time
- return;
- }
- Set<Context> triggers = processingTimeTimers.get(time);
- if (triggers == null) {
- getRuntimeContext().registerTimer(time, WindowOperator.this);
- triggers = new HashSet<>();
- processingTimeTimers.put(time, triggers);
- }
- this.processingTimeTimer = time;
- triggers.add(this);
- }
-
- @Override
- public void registerEventTimeTimer(long time) {
- if (watermarkTimer == time) {
- // we already have set a trigger for that time
- return;
- }
- Set<Context> triggers = watermarkTimers.get(time);
- if (triggers == null) {
- triggers = new HashSet<>();
- watermarkTimers.put(time, triggers);
- }
- this.watermarkTimer = time;
- triggers.add(this);
- }
-
- public Trigger.TriggerResult onProcessingTime(long time) throws Exception {
- if (time == processingTimeTimer) {
- return trigger.onProcessingTime(time, this);
- } else {
- return Trigger.TriggerResult.CONTINUE;
- }
- }
-
- public Trigger.TriggerResult onEventTime(long time) throws Exception {
- if (time == watermarkTimer) {
- return trigger.onEventTime(time, this);
- } else {
- return Trigger.TriggerResult.CONTINUE;
- }
- }
- }
-
- /**
- * When this flag is enabled the current processing time is set as the timestamp of elements
- * upon arrival. This must be used, for example, when using the
- * {@link org.apache.flink.streaming.api.windowing.evictors.TimeEvictor} with processing
- * time semantics.
- */
- public WindowOperator<K, IN, OUT, W> enableSetProcessingTime(boolean setProcessingTime) {
- this.setProcessingTime = setProcessingTime;
- return this;
- }
-
- @Override
- public final void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig executionConfig) {
- if (userFunction instanceof OutputTypeConfigurable) {
- @SuppressWarnings("unchecked")
- OutputTypeConfigurable<OUT> typeConfigurable = (OutputTypeConfigurable<OUT>) userFunction;
- typeConfigurable.setOutputType(outTypeInfo, executionConfig);
- }
- }
-
- // ------------------------------------------------------------------------
- // Checkpointing
- // ------------------------------------------------------------------------
-
- @Override
- public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
- StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
-
- // we write the panes with the key/value maps into the stream
- StateBackend.CheckpointStateOutputView out = getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp);
-
- int numKeys = windows.size();
- out.writeInt(numKeys);
-
- for (Map.Entry<K, Map<W, Context>> keyWindows: windows.entrySet()) {
- int numWindows = keyWindows.getValue().size();
- out.writeInt(numWindows);
- for (Context context: keyWindows.getValue().values()) {
- context.writeToState(out);
- }
- }
-
- taskState.setOperatorState(out.closeAndGetHandle());
- return taskState;
- }
-
- @Override
- public void restoreState(StreamTaskState taskState) throws Exception {
- super.restoreState(taskState);
-
-
- @SuppressWarnings("unchecked")
- StateHandle<DataInputView> inputState = (StateHandle<DataInputView>) taskState.getOperatorState();
- DataInputView in = inputState.getState(getUserCodeClassloader());
-
- int numKeys = in.readInt();
- this.windows = new HashMap<>(numKeys);
- this.processingTimeTimers = new HashMap<>();
- this.watermarkTimers = new HashMap<>();
-
- for (int i = 0; i < numKeys; i++) {
- int numWindows = in.readInt();
- for (int j = 0; j < numWindows; j++) {
- Context context = new Context(in);
- Map<W, Context> keyWindows = windows.get(context.key);
- if (keyWindows == null) {
- keyWindows = new HashMap<>(numWindows);
- windows.put(context.key, keyWindows);
- }
- keyWindows.put(context.window, context);
- }
- }
- }
-
- // ------------------------------------------------------------------------
- // Getters for testing
- // ------------------------------------------------------------------------
-
- @VisibleForTesting
- public boolean isSetProcessingTime() {
- return setProcessingTime;
- }
-
- @VisibleForTesting
- public Trigger<? super IN, ? super W> getTrigger() {
- return trigger;
- }
-
- @VisibleForTesting
- public KeySelector<IN, K> getKeySelector() {
- return keySelector;
- }
-
- @VisibleForTesting
- public WindowAssigner<? super IN, W> getWindowAssigner() {
- return windowAssigner;
- }
-
- @VisibleForTesting
- public WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> getWindowBufferFactory() {
- return windowBufferFactory;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/EvictingWindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/EvictingWindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/EvictingWindowBuffer.java
deleted file mode 100644
index 28365e1..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/EvictingWindowBuffer.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.runtime.operators.windowing.buffers;
-
-/**
- * A {@code WindowBuffer} that can also evict elements from the buffer. The order in which
- * the elements are added is preserved. Elements can only be evicted started from the beginning of
- * the buffer.
- *
- * @param <T> The type of elements that this {@code WindowBuffer} can store.
- */
-
-public interface EvictingWindowBuffer<T> extends WindowBuffer<T> {
-
- /**
- * Removes the given number of elements, starting from the beginning.
- * @param count The number of elements to remove.
- */
- void removeElements(int count);
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/HeapWindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/HeapWindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/HeapWindowBuffer.java
deleted file mode 100644
index f9f8b26..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/HeapWindowBuffer.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.runtime.operators.windowing.buffers;
-
-import com.google.common.base.Function;
-import com.google.common.collect.FluentIterable;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-import java.util.ArrayDeque;
-
-/**
- * An {@link EvictingWindowBuffer} that stores elements on the Java Heap.
- *
- * @param <T> The type of elements that this {@code WindowBuffer} can store.
- */
-public class HeapWindowBuffer<T> implements EvictingWindowBuffer<T> {
- private static final long serialVersionUID = 1L;
-
- private ArrayDeque<StreamRecord<T>> elements;
-
- protected HeapWindowBuffer() {
- this.elements = new ArrayDeque<>();
- }
-
- @Override
- public void storeElement(StreamRecord<T> element) {
- elements.add(element);
- }
-
- @Override
- public void removeElements(int count) {
- // TODO determine if this can be done in a better way
- for (int i = 0; i < count; i++) {
- elements.removeFirst();
- }
- }
-
- @Override
- public Iterable<StreamRecord<T>> getElements() {
- return elements;
- }
-
- @Override
- public Iterable<T> getUnpackedElements() {
- return FluentIterable.from(elements).transform(new Function<StreamRecord<T>, T>() {
- @Override
- public T apply(StreamRecord<T> record) {
- return record.getValue();
- }
- });
- }
-
- @Override
- public int size() {
- return elements.size();
- }
-
- public static class Factory<T> implements WindowBufferFactory<T, HeapWindowBuffer<T>> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void setRuntimeContext(RuntimeContext ctx) {}
-
- @Override
- public void open(Configuration config) {}
-
- @Override
- public void close() {}
-
- @Override
- public HeapWindowBuffer<T> create() {
- return new HeapWindowBuffer<>();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/PreAggregatingHeapWindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/PreAggregatingHeapWindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/PreAggregatingHeapWindowBuffer.java
deleted file mode 100644
index 37be8f0..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/PreAggregatingHeapWindowBuffer.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.runtime.operators.windowing.buffers;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.functions.util.FunctionUtils;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-import java.util.Collections;
-
-/**
- * An {@link WindowBuffer} that stores elements on the Java Heap. This buffer uses a
- * {@link ReduceFunction} to pre-aggregate elements that are added to the buffer.
- *
- * @param <T> The type of elements that this {@code WindowBuffer} can store.
- */
-
-public class PreAggregatingHeapWindowBuffer<T> implements WindowBuffer<T> {
- private static final long serialVersionUID = 1L;
-
- private final ReduceFunction<T> reduceFunction;
- private transient StreamRecord<T> data;
-
- protected PreAggregatingHeapWindowBuffer(ReduceFunction<T> reduceFunction) {
- this.reduceFunction = reduceFunction;
- }
-
- @Override
- public void storeElement(StreamRecord<T> element) throws Exception {
- if (data == null) {
- data = new StreamRecord<>(element.getValue(), element.getTimestamp());
- } else {
- data.replace(reduceFunction.reduce(data.getValue(), element.getValue()));
- }
- }
-
- @Override
- public Iterable<StreamRecord<T>> getElements() {
- return Collections.singleton(data);
- }
-
- @Override
- public Iterable<T> getUnpackedElements() {
- return Collections.singleton(data.getValue());
- }
-
- @Override
- public int size() {
- return 1;
- }
-
- public static class Factory<T> implements WindowBufferFactory<T, PreAggregatingHeapWindowBuffer<T>> {
- private static final long serialVersionUID = 1L;
-
- private final ReduceFunction<T> reduceFunction;
-
- public Factory(ReduceFunction<T> reduceFunction) {
- this.reduceFunction = reduceFunction;
- }
-
- @Override
- public void setRuntimeContext(RuntimeContext ctx) {
- FunctionUtils.setFunctionRuntimeContext(reduceFunction, ctx);
- }
-
- @Override
- public void open(Configuration config) throws Exception {
- FunctionUtils.openFunction(reduceFunction, config);
- }
-
- @Override
- public void close() throws Exception {
- FunctionUtils.closeFunction(reduceFunction);
- }
-
- @Override
- public PreAggregatingHeapWindowBuffer<T> create() {
- return new PreAggregatingHeapWindowBuffer<>(reduceFunction);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBuffer.java
deleted file mode 100644
index b111667..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBuffer.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.runtime.operators.windowing.buffers;
-
-
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-import java.io.Serializable;
-
-/**
- * A {@code WindowBuffer} is used by
- * {@link org.apache.flink.streaming.runtime.operators.windowing.WindowOperator} to store
- * the elements of one pane.
- *
- * <p>
- * A pane is the bucket of elements that have the same key (assigned by the
- * {@link org.apache.flink.api.java.functions.KeySelector}) and same {@link Window}. An element can
- * be in multiple panes of it was assigned to multiple windows by the
- * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. These panes all
- * have their own instance of the {@code Evictor}.
- *
- * @param <T> The type of elements that this {@code WindowBuffer} can store.
- */
-public interface WindowBuffer<T> extends Serializable {
-
- /**
- * Adds the element to the buffer.
- *
- * @param element The element to add.
- */
- void storeElement(StreamRecord<T> element) throws Exception;
-
- /**
- * Returns all elements that are currently in the buffer.
- */
- Iterable<StreamRecord<T>> getElements();
-
- /**
- * Returns all elements that are currently in the buffer. This will unwrap the contained
- * elements from their {@link StreamRecord}.
- */
- Iterable<T> getUnpackedElements();
-
- /**
- * Returns the number of elements that are currently in the buffer.
- */
- int size();
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBufferFactory.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBufferFactory.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBufferFactory.java
deleted file mode 100644
index 4bcdf09..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBufferFactory.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.runtime.operators.windowing.buffers;
-
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.configuration.Configuration;
-
-import java.io.Serializable;
-
-/**
- * A factory for {@link WindowBuffer WindowBuffers}.
- *
- * @param <T> The type of elements that the created {@code WindowBuffer} can store.
- * @param <B> The type of the created {@code WindowBuffer}
- */
-public interface WindowBufferFactory<T, B extends WindowBuffer<T>> extends Serializable {
-
- /**
- * Sets the {@link RuntimeContext} that is used to initialize eventual user functions
- * inside the created buffers.
- */
- void setRuntimeContext(RuntimeContext ctx);
-
- /**
- * Calls {@code open()} on eventual user functions inside the buffer.
- */
- void open(Configuration config) throws Exception;
-
- /**
- * Calls {@code close()} on eventual user functions inside the buffer.
- */
-
- void close() throws Exception;
-
- /**
- * Creates a new {@code WindowBuffer}.
- */
- B create();
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/package-info.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/package-info.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/package-info.java
deleted file mode 100644
index 55749a1..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * This package contains the operators that implement the various window operations
- * on data streams.
- */
-package org.apache.flink.streaming.runtime.operators.windowing;
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitioner.java
deleted file mode 100644
index f3d851c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitioner.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.partitioner;
-
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-/**
- * Partitioner that selects all the output channels.
- *
- * @param <T> Type of the elements in the Stream being broadcast
- */
-public class BroadcastPartitioner<T> extends StreamPartitioner<T> {
- private static final long serialVersionUID = 1L;
-
- int[] returnArray;
- boolean set;
- int setNumber;
-
- @Override
- public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
- int numberOfOutputChannels) {
- if (set && setNumber == numberOfOutputChannels) {
- return returnArray;
- } else {
- this.returnArray = new int[numberOfOutputChannels];
- for (int i = 0; i < numberOfOutputChannels; i++) {
- returnArray[i] = i;
- }
- set = true;
- setNumber = numberOfOutputChannels;
- return returnArray;
- }
- }
-
- @Override
- public StreamPartitioner<T> copy() {
- return this;
- }
-
- @Override
- public String toString() {
- return "BROADCAST";
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java
deleted file mode 100644
index 7bb9480..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.partitioner;
-
-import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-/**
- * Partitioner that selects the channel with a user defined partitioner function on a key.
- *
- * @param <K>
- * Type of the key
- * @param <T>
- * Type of the data
- */
-public class CustomPartitionerWrapper<K, T> extends StreamPartitioner<T> {
- private static final long serialVersionUID = 1L;
-
- private int[] returnArray = new int[1];
- Partitioner<K> partitioner;
- KeySelector<T, K> keySelector;
-
- public CustomPartitionerWrapper(Partitioner<K> partitioner, KeySelector<T, K> keySelector) {
- this.partitioner = partitioner;
- this.keySelector = keySelector;
- }
-
- @Override
- public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record, int numberOfOutputChannels) {
-
- K key = null;
- try {
- key = keySelector.getKey(record.getInstance().getValue());
- } catch (Exception e) {
- throw new RuntimeException("Could not extract key from " + record.getInstance(), e);
- }
-
- returnArray[0] = partitioner.partition(key,
- numberOfOutputChannels);
-
- return returnArray;
- }
-
- @Override
- public StreamPartitioner<T> copy() {
- return this;
- }
-
- @Override
- public String toString() {
- return "CUSTOM";
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitioner.java
deleted file mode 100644
index 4fb460c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitioner.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.partitioner;
-
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-/**
- * Partitioner that forwards elements only to the locally running downstream operation.
- *
- * @param <T> Type of the elements in the Stream
- */
-public class ForwardPartitioner<T> extends StreamPartitioner<T> {
- private static final long serialVersionUID = 1L;
-
- private int[] returnArray = new int[] {0};
-
- @Override
- public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record, int numberOfOutputChannels) {
- return returnArray;
- }
-
- public StreamPartitioner<T> copy() {
- return this;
- }
-
- @Override
- public String toString() {
- return "FORWARD";
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitioner.java
deleted file mode 100644
index b19fb41..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitioner.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.partitioner;
-
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-/**
- * Partitioner that sends all elements to the downstream operator with subtask ID=0;
- *
- * @param <T> Type of the elements in the Stream being partitioned
- */
-public class GlobalPartitioner<T> extends StreamPartitioner<T> {
- private static final long serialVersionUID = 1L;
-
- private int[] returnArray = new int[] { 0 };
-
- @Override
- public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
- int numberOfOutputChannels) {
- return returnArray;
- }
-
- @Override
- public StreamPartitioner<T> copy() {
- return this;
- }
-
- @Override
- public String toString() {
- return "GLOBAL";
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/HashPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/HashPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/HashPartitioner.java
deleted file mode 100644
index a3f5158..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/HashPartitioner.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.partitioner;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-/**
- * Partitioner selects the target channel based on the hash value of a key from a
- * {@link KeySelector}.
- *
- * @param <T> Type of the elements in the Stream being partitioned
- */
-public class HashPartitioner<T> extends StreamPartitioner<T> {
- private static final long serialVersionUID = 1L;
-
- private int[] returnArray = new int[1];
- KeySelector<T, ?> keySelector;
-
- public HashPartitioner(KeySelector<T, ?> keySelector) {
- this.keySelector = keySelector;
- }
-
- @Override
- public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
- int numberOfOutputChannels) {
- Object key;
- try {
- key = keySelector.getKey(record.getInstance().getValue());
- } catch (Exception e) {
- throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e);
- }
- returnArray[0] = Math.abs(key.hashCode() % numberOfOutputChannels);
-
- return returnArray;
- }
-
- @Override
- public StreamPartitioner<T> copy() {
- return this;
- }
-
- @Override
- public String toString() {
- return "HASH";
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java
deleted file mode 100644
index 2dfff0e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.partitioner;
-
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-/**
- * Partitioner that distributes the data equally by cycling through the output
- * channels.
- *
- * @param <T> Type of the elements in the Stream being rebalanced
- */
-public class RebalancePartitioner<T> extends StreamPartitioner<T> {
- private static final long serialVersionUID = 1L;
-
- private int[] returnArray = new int[] {-1};
-
- @Override
- public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
- int numberOfOutputChannels) {
- this.returnArray[0] = (this.returnArray[0] + 1) % numberOfOutputChannels;
- return this.returnArray;
- }
-
- public StreamPartitioner<T> copy() {
- return this;
- }
-
- @Override
- public String toString() {
- return "REBALANCE";
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitioner.java
deleted file mode 100644
index 93c6f9c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitioner.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.partitioner;
-
-import java.util.Random;
-
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-/**
- * Partitioner that distributes the data equally by selecting one output channel
- * randomly.
- *
- * @param <T>
- * Type of the Tuple
- */
-public class ShufflePartitioner<T> extends StreamPartitioner<T> {
- private static final long serialVersionUID = 1L;
-
- private Random random = new Random();
-
- private int[] returnArray = new int[1];
-
- @Override
- public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
- int numberOfOutputChannels) {
- returnArray[0] = random.nextInt(numberOfOutputChannels);
- return returnArray;
- }
-
- @Override
- public StreamPartitioner<T> copy() {
- return new ShufflePartitioner<T>();
- }
-
- @Override
- public String toString() {
- return "SHUFFLE";
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java
deleted file mode 100644
index 4ef360d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.runtime.partitioner;
-
-import java.io.Serializable;
-
-import org.apache.flink.runtime.io.network.api.writer.ChannelSelector;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-public abstract class StreamPartitioner<T> implements
- ChannelSelector<SerializationDelegate<StreamRecord<T>>>, Serializable {
- private static final long serialVersionUID = 1L;
-
- public abstract StreamPartitioner<T> copy();
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
deleted file mode 100644
index d4363cd..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUStreamRecord<?>WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.streamrecord;
-
-import com.google.common.base.Preconditions;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.streaming.api.watermark.Watermark;
-
-import java.io.IOException;
-
-/**
- * Serializer for {@link StreamRecord} and {@link org.apache.flink.streaming.api.watermark.Watermark}. This does not behave like a normal
- * {@link TypeSerializer}, instead, this is only used at the
- * {@link org.apache.flink.streaming.runtime.tasks.StreamTask} level for transmitting
- * {@link StreamRecord StreamRecords} and {@link org.apache.flink.streaming.api.watermark.Watermark Watermarks}. This serializer
- * can handle both of them, therefore it returns {@link Object} the result has
- * to be cast to the correct type.
- *
- * @param <T> The type of value in the {@link org.apache.flink.streaming.runtime.streamrecord.StreamRecord}
- */
-public final class MultiplexingStreamRecordSerializer<T> extends TypeSerializer<StreamElement> {
-
- private static final long serialVersionUID = 1L;
-
- private static final long IS_WATERMARK = Long.MIN_VALUE;
-
- private final TypeSerializer<T> typeSerializer;
-
-
- public MultiplexingStreamRecordSerializer(TypeSerializer<T> serializer) {
- if (serializer instanceof MultiplexingStreamRecordSerializer || serializer instanceof StreamRecordSerializer) {
- throw new RuntimeException("StreamRecordSerializer given to StreamRecordSerializer as value TypeSerializer: " + serializer);
- }
- this.typeSerializer = Preconditions.checkNotNull(serializer);
- }
-
-
- @Override
- public boolean isImmutableType() {
- return false;
- }
-
- @Override
- public TypeSerializer<StreamElement> duplicate() {
- TypeSerializer<T> copy = typeSerializer.duplicate();
- return (copy == typeSerializer) ? this : new MultiplexingStreamRecordSerializer<T>(copy);
- }
-
- @Override
- public StreamRecord<T> createInstance() {
- return new StreamRecord<T>(typeSerializer.createInstance(), 0L);
- }
-
- @Override
- public StreamElement copy(StreamElement from) {
- // we can reuse the timestamp since Instant is immutable
- if (from.isRecord()) {
- StreamRecord<T> fromRecord = from.asRecord();
- return new StreamRecord<T>(typeSerializer.copy(fromRecord.getValue()), fromRecord.getTimestamp());
- }
- else if (from.isWatermark()) {
- // is immutable
- return from;
- }
- else {
- throw new RuntimeException("Cannot copy " + from);
- }
- }
-
- @Override
- public StreamElement copy(StreamElement from, StreamElement reuse) {
- if (from.isRecord() && reuse.isRecord()) {
- StreamRecord<T> fromRecord = from.asRecord();
- StreamRecord<T> reuseRecord = reuse.asRecord();
-
- T valueCopy = typeSerializer.copy(fromRecord.getValue(), reuseRecord.getValue());
- reuseRecord.replace(valueCopy, fromRecord.getTimestamp());
- return reuse;
- }
- else if (from.isWatermark()) {
- // is immutable
- return from;
- }
- else {
- throw new RuntimeException("Cannot copy " + from + " -> " + reuse);
- }
- }
-
- @Override
- public int getLength() {
- return -1;
- }
-
- @Override
- public void serialize(StreamElement value, DataOutputView target) throws IOException {
- if (value.isRecord()) {
- StreamRecord<T> record = value.asRecord();
- target.writeLong(record.getTimestamp());
- typeSerializer.serialize(record.getValue(), target);
- }
- else if (value.isWatermark()) {
- target.writeLong(IS_WATERMARK);
- target.writeLong(value.asWatermark().getTimestamp());
- }
- else {
- throw new RuntimeException();
- }
- }
-
- @Override
- public StreamElement deserialize(DataInputView source) throws IOException {
- long millis = source.readLong();
-
- if (millis == IS_WATERMARK) {
- return new Watermark(source.readLong());
- }
- else {
- T element = typeSerializer.deserialize(source);
- return new StreamRecord<T>(element, millis);
- }
- }
-
- @Override
- public StreamElement deserialize(StreamElement reuse, DataInputView source) throws IOException {
- long millis = source.readLong();
-
- if (millis == IS_WATERMARK) {
- return new Watermark(source.readLong());
- }
- else {
- StreamRecord<T> reuseRecord = reuse.asRecord();
- T element = typeSerializer.deserialize(reuseRecord.getValue(), source);
- reuseRecord.replace(element, millis);
- return reuse;
- }
- }
-
- @Override
- public void copy(DataInputView source, DataOutputView target) throws IOException {
- long millis = source.readLong();
- target.writeLong(millis);
-
- if (millis == IS_WATERMARK) {
- target.writeLong(source.readLong());
- } else {
- typeSerializer.copy(source, target);
- }
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj instanceof MultiplexingStreamRecordSerializer) {
- MultiplexingStreamRecordSerializer<?> other = (MultiplexingStreamRecordSerializer<?>) obj;
-
- return other.canEqual(this) && typeSerializer.equals(other.typeSerializer);
- } else {
- return false;
- }
- }
-
- @Override
- public boolean canEqual(Object obj) {
- return obj instanceof MultiplexingStreamRecordSerializer;
- }
-
- @Override
- public int hashCode() {
- return typeSerializer.hashCode();
- }
-}