You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:03:51 UTC
[35/51] [partial] flink git commit: [FLINK-2877] Move Streaming API
out of Staging package
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
deleted file mode 100644
index fbecbd1..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
+++ /dev/null
@@ -1,300 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.streaming.api.functions.source.EventTimeSourceFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
-/**
- * {@link StreamOperator} for streaming sources.
- */
-public class StreamSource<T> extends AbstractUdfStreamOperator<T, SourceFunction<T>> implements StreamOperator<T> {
-
- private static final long serialVersionUID = 1L;
- private transient SourceFunction.SourceContext<T> ctx;
-
- public StreamSource(SourceFunction<T> sourceFunction) {
- super(sourceFunction);
-
- this.chainingStrategy = ChainingStrategy.HEAD;
- }
-
- public void run(final Object lockingObject, final Output<StreamRecord<T>> collector) throws Exception {
- final ExecutionConfig executionConfig = getExecutionConfig();
-
- if (userFunction instanceof EventTimeSourceFunction) {
- ctx = new ManualWatermarkContext<T>(lockingObject, collector);
- } else if (executionConfig.getAutoWatermarkInterval() > 0) {
- ctx = new AutomaticWatermarkContext<T>(lockingObject, collector, executionConfig);
- } else if (executionConfig.areTimestampsEnabled()) {
- ctx = new NonWatermarkContext<T>(lockingObject, collector);
- } else {
- ctx = new NonTimestampContext<T>(lockingObject, collector);
- }
-
- userFunction.run(ctx);
-
- // This will mostly emit a final +Inf Watermark to make the Watermark logic work
- // when some sources finish before others do
- ctx.close();
- }
-
- public void cancel() {
- userFunction.cancel();
- ctx.close();
- }
-
- /**
- * {@link SourceFunction.SourceContext} to be used for sources that don't emit watermarks.
- * In addition to {@link NonWatermarkContext} this will also not attach timestamps to sources.
- * (Technically it will always set the timestamp to 0).
- */
- public static class NonTimestampContext<T> implements SourceFunction.SourceContext<T> {
-
- private final Object lockingObject;
- private final Output<StreamRecord<T>> output;
- private final StreamRecord<T> reuse;
-
- public NonTimestampContext(Object lockingObjectParam, Output<StreamRecord<T>> outputParam) {
- this.lockingObject = lockingObjectParam;
- this.output = outputParam;
- this.reuse = new StreamRecord<T>(null);
- }
-
- @Override
- public void collect(T element) {
- output.collect(reuse.replace(element, 0));
- }
-
- @Override
- public void collectWithTimestamp(T element, long timestamp) {
- throw new UnsupportedOperationException("Automatic-Timestamp sources cannot emit" +
- " elements with a timestamp. See interface ManualTimestampSourceFunction" +
- " if you want to manually assign timestamps to elements.");
- }
-
- @Override
- public void emitWatermark(Watermark mark) {
- throw new UnsupportedOperationException("Automatic-Timestamp sources cannot emit" +
- " elements with a timestamp. See interface ManualTimestampSourceFunction" +
- " if you want to manually assign timestamps to elements.");
- }
-
- @Override
- public Object getCheckpointLock() {
- return lockingObject;
- }
-
- @Override
- public void close() {}
- }
-
- /**
- * {@link SourceFunction.SourceContext} to be used for sources that don't emit watermarks.
- */
- public static class NonWatermarkContext<T> implements SourceFunction.SourceContext<T> {
-
- private final Object lockingObject;
- private final Output<StreamRecord<T>> output;
- private final StreamRecord<T> reuse;
-
- public NonWatermarkContext(Object lockingObjectParam, Output<StreamRecord<T>> outputParam) {
- this.lockingObject = lockingObjectParam;
- this.output = outputParam;
- this.reuse = new StreamRecord<T>(null);
- }
-
- @Override
- public void collect(T element) {
- long currentTime = System.currentTimeMillis();
- output.collect(reuse.replace(element, currentTime));
- }
-
- @Override
- public void collectWithTimestamp(T element, long timestamp) {
- throw new UnsupportedOperationException("Automatic-Timestamp sources cannot emit" +
- " elements with a timestamp. See interface ManualTimestampSourceFunction" +
- " if you want to manually assign timestamps to elements.");
- }
-
- @Override
- public void emitWatermark(Watermark mark) {
- throw new UnsupportedOperationException("Automatic-Timestamp sources cannot emit" +
- " elements with a timestamp. See interface ManualTimestampSourceFunction" +
- " if you want to manually assign timestamps to elements.");
- }
-
- @Override
- public Object getCheckpointLock() {
- return lockingObject;
- }
-
- @Override
- public void close() {}
- }
-
- /**
- * {@link SourceFunction.SourceContext} to be used for sources with automatic timestamps
- * and watermark emission.
- */
- public static class AutomaticWatermarkContext<T> implements SourceFunction.SourceContext<T> {
-
- private final ScheduledExecutorService scheduleExecutor;
- private final ScheduledFuture<?> watermarkTimer;
- private final long watermarkInterval;
-
- private final Object lockingObject;
- private final Output<StreamRecord<T>> output;
- private final StreamRecord<T> reuse;
-
- private volatile long lastWatermarkTime;
-
- public AutomaticWatermarkContext(Object lockingObjectParam,
- Output<StreamRecord<T>> outputParam,
- ExecutionConfig executionConfig) {
- this.lockingObject = lockingObjectParam;
- this.output = outputParam;
- this.reuse = new StreamRecord<T>(null);
-
- watermarkInterval = executionConfig.getAutoWatermarkInterval();
-
- scheduleExecutor = Executors.newScheduledThreadPool(1);
-
- watermarkTimer = scheduleExecutor.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- long currentTime = System.currentTimeMillis();
- // align the watermarks across all machines. this will ensure that we
- // don't have watermarks that creep along at different intervals because
- // the machine clocks are out of sync
- long watermarkTime = currentTime - (currentTime % watermarkInterval);
- if (currentTime > watermarkTime && watermarkTime - lastWatermarkTime >= watermarkInterval) {
- synchronized (lockingObject) {
- if (currentTime > watermarkTime && watermarkTime - lastWatermarkTime >= watermarkInterval) {
- output.emitWatermark(new Watermark(watermarkTime));
- lastWatermarkTime = watermarkTime;
- }
- }
- }
- }
- }, 0, watermarkInterval, TimeUnit.MILLISECONDS);
-
- }
-
- @Override
- public void collect(T element) {
- synchronized (lockingObject) {
- long currentTime = System.currentTimeMillis();
- output.collect(reuse.replace(element, currentTime));
-
- long watermarkTime = currentTime - (currentTime % watermarkInterval);
- if (currentTime > watermarkTime && watermarkTime - lastWatermarkTime >= watermarkInterval) {
- output.emitWatermark(new Watermark(watermarkTime));
- lastWatermarkTime = watermarkTime;
- }
- }
- }
-
- @Override
- public void collectWithTimestamp(T element, long timestamp) {
- throw new UnsupportedOperationException("Automatic-Timestamp sources cannot emit" +
- " elements with a timestamp. See interface ManualTimestampSourceFunction" +
- " if you want to manually assign timestamps to elements.");
- }
-
- @Override
- public void emitWatermark(Watermark mark) {
- throw new UnsupportedOperationException("Automatic-Timestamp sources cannot emit" +
- " elements with a timestamp. See interface ManualTimestampSourceFunction" +
- " if you want to manually assign timestamps to elements.");
- }
-
- @Override
- public Object getCheckpointLock() {
- return lockingObject;
- }
-
- @Override
- public void close() {
- watermarkTimer.cancel(true);
- scheduleExecutor.shutdownNow();
- // emit one last +Inf watermark to make downstream watermark processing work
- // when some sources close early
- synchronized (lockingObject) {
- output.emitWatermark(new Watermark(Long.MAX_VALUE));
- }
- }
- }
-
- /**
- * {@link SourceFunction.SourceContext} to be used for sources with manual timestamp
- * assignment and manual watermark emission.
- */
- public static class ManualWatermarkContext<T> implements SourceFunction.SourceContext<T> {
-
- private final Object lockingObject;
- private final Output<StreamRecord<T>> output;
- private final StreamRecord<T> reuse;
-
- public ManualWatermarkContext(Object lockingObject, Output<StreamRecord<T>> output) {
- this.lockingObject = lockingObject;
- this.output = output;
- this.reuse = new StreamRecord<T>(null);
- }
-
- @Override
- public void collect(T element) {
- throw new UnsupportedOperationException("Manual-Timestamp sources can only emit" +
- " elements with a timestamp. Please use collectWithTimestamp().");
- }
-
- @Override
- public void collectWithTimestamp(T element, long timestamp) {
- synchronized (lockingObject) {
- output.collect(reuse.replace(element, timestamp));
- }
- }
-
- @Override
- public void emitWatermark(Watermark mark) {
- output.emitWatermark(mark);
- }
-
- @Override
- public Object getCheckpointLock() {
- return lockingObject;
- }
-
- @Override
- public void close() {
- // emit one last +Inf watermark to make downstream watermark processing work
- // when some sources close early
- synchronized (lockingObject) {
- output.emitWatermark(new Watermark(Long.MAX_VALUE));
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
deleted file mode 100644
index 87a9abd..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
-import org.apache.flink.api.common.functions.util.AbstractRuntimeUDFContext;
-import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Implementation of the {@link org.apache.flink.api.common.functions.RuntimeContext},
- * for streaming operators.
- */
-public class StreamingRuntimeContext extends AbstractRuntimeUDFContext {
-
- /** The operator to which this function belongs */
- private final AbstractStreamOperator<?> operator;
-
- /** The task environment running the operator */
- private final Environment taskEnvironment;
-
- /** The key/value state, if the user-function requests it */
- private HashMap<String, OperatorState<?>> keyValueStates;
-
- /** Type of the values stored in the state, to make sure repeated requests of the state are consistent */
- private HashMap<String, TypeInformation<?>> stateTypeInfos;
-
-
- public StreamingRuntimeContext(AbstractStreamOperator<?> operator,
- Environment env, Map<String, Accumulator<?, ?>> accumulators) {
- super(env.getTaskName(),
- env.getNumberOfSubtasks(),
- env.getIndexInSubtaskGroup(),
- env.getUserClassLoader(),
- operator.getExecutionConfig(),
- accumulators,
- env.getDistributedCacheEntries());
-
- this.operator = operator;
- this.taskEnvironment = env;
- }
-
- // ------------------------------------------------------------------------
-
- /**
- * Returns the input split provider associated with the operator.
- *
- * @return The input split provider.
- */
- public InputSplitProvider getInputSplitProvider() {
- return taskEnvironment.getInputSplitProvider();
- }
-
- /**
- * Register a timer callback. At the specified time the {@link Triggerable } will be invoked.
- * This call is guaranteed to not happen concurrently with method calls on the operator.
- *
- * @param time The absolute time in milliseconds.
- * @param target The target to be triggered.
- */
- public void registerTimer(long time, Triggerable target) {
- operator.registerTimer(time, target);
- }
-
- // ------------------------------------------------------------------------
- // broadcast variables
- // ------------------------------------------------------------------------
-
- @Override
- public <RT> List<RT> getBroadcastVariable(String name) {
- throw new UnsupportedOperationException("Broadcast variables can only be used in DataSet programs");
- }
-
- @Override
- public <T, C> C getBroadcastVariableWithInitializer(String name, BroadcastVariableInitializer<T, C> initializer) {
- throw new UnsupportedOperationException("Broadcast variables can only be used in DataSet programs");
- }
-
- // ------------------------------------------------------------------------
- // key/value state
- // ------------------------------------------------------------------------
-
- @Override
- public <S> OperatorState<S> getKeyValueState(String name, Class<S> stateType, S defaultState) {
- requireNonNull(stateType, "The state type class must not be null");
-
- TypeInformation<S> typeInfo;
- try {
- typeInfo = TypeExtractor.getForClass(stateType);
- }
- catch (Exception e) {
- throw new RuntimeException("Cannot analyze type '" + stateType.getName() +
- "' from the class alone, due to generic type parameters. " +
- "Please specify the TypeInformation directly.", e);
- }
-
- return getKeyValueState(name, typeInfo, defaultState);
- }
-
- @Override
- public <S> OperatorState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState) {
- requireNonNull(name, "The name of the state must not be null");
- requireNonNull(stateType, "The state type information must not be null");
-
- OperatorState<?> previousState;
-
- // check if this is a repeated call to access the state
- if (this.stateTypeInfos != null && this.keyValueStates != null &&
- (previousState = this.keyValueStates.get(name)) != null) {
-
- // repeated call
- TypeInformation<?> previousType;
- if (stateType.equals((previousType = this.stateTypeInfos.get(name)))) {
- // valid case, same type requested again
- @SuppressWarnings("unchecked")
- OperatorState<S> previous = (OperatorState<S>) previousState;
- return previous;
- }
- else {
- // invalid case, different type requested this time
- throw new IllegalStateException("Cannot initialize key/value state for type " + stateType +
- " ; The key/value state has already been created and initialized for a different type: " +
- previousType);
- }
- }
- else {
- // first time access to the key/value state
- if (this.stateTypeInfos == null) {
- this.stateTypeInfos = new HashMap<>();
- }
- if (this.keyValueStates == null) {
- this.keyValueStates = new HashMap<>();
- }
-
- try {
- OperatorState<S> state = operator.createKeyValueState(name, stateType, defaultState);
- this.keyValueStates.put(name, state);
- this.stateTypeInfos.put(name, stateType);
- return state;
- }
- catch (RuntimeException e) {
- throw e;
- }
- catch (Exception e) {
- throw new RuntimeException("Cannot initialize the key/value state", e);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/TimestampedCollector.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/TimestampedCollector.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/TimestampedCollector.java
deleted file mode 100644
index 62514fc..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/TimestampedCollector.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.operators;
-
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.Collector;
-import org.joda.time.Instant;
-
-/**
- * Wrapper around an {@link Output} for user functions that expect a {@link Collector}.
- * Before giving the {@link TimestampedCollector} to a user function you must set
- * the {@link Instant timestamp} that should be attached to emitted elements. Most operators
- * would set the {@link Instant timestamp} of the incoming {@link org.apache.flink.streaming.runtime.streamrecord.StreamRecord} here.
- *
- * @param <T> The type of the elments that can be emitted.
- */
-public class TimestampedCollector<T> implements Collector<T> {
-
- private final Output<StreamRecord<T>> output;
-
- private final StreamRecord<T> reuse;
-
- private long timestamp;
-
-
- /**
- * Creates a new {@link TimestampedCollector} that wraps the given {@link Output}.
- */
- public TimestampedCollector(Output<StreamRecord<T>> output) {
- this.output = output;
- this.reuse = new StreamRecord<T>(null);
- }
-
- @Override
- public void collect(T record) {
- output.collect(reuse.replace(record, timestamp));
- }
-
- /**
- * Sets the {@link Instant timestamp} that is attached to elements that get emitted using
- * {@link #collect}
- * @param timestamp The timestamp in milliseconds
- */
- public void setTimestamp(long timestamp) {
- this.timestamp = timestamp;
- }
-
- @Override
- public void close() {
- output.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/TwoInputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/TwoInputStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/TwoInputStreamOperator.java
deleted file mode 100644
index afc6d1b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/TwoInputStreamOperator.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-/**
- * Interface for stream operators with two inputs. Use
- * {@link org.apache.flink.streaming.api.operators.AbstractStreamOperator} as a base class if
- * you want to implement a custom operator.
- *
- * @param <IN1> The input type of the operator
- * @param <IN2> The input type of the operator
- * @param <OUT> The output type of the operator
- */
-public interface TwoInputStreamOperator<IN1, IN2, OUT> extends StreamOperator<OUT> {
-
- /**
- * Processes one element that arrived on the first input of this two-input operator.
- * This method is guaranteed to not be called concurrently with other methods of the operator.
- */
- public void processElement1(StreamRecord<IN1> element) throws Exception;
-
- /**
- * Processes one element that arrived on the second input of this two-input operator.
- * This method is guaranteed to not be called concurrently with other methods of the operator.
- */
- public void processElement2(StreamRecord<IN2> element) throws Exception;
-
- /**
- * Processes a {@link Watermark} that arrived on the first input of this two-input operator.
- * This method is guaranteed to not be called concurrently with other methods of the operator.
- *
- * @see org.apache.flink.streaming.api.watermark.Watermark
- */
- public void processWatermark1(Watermark mark) throws Exception;
-
- /**
- * Processes a {@link Watermark} that arrived on the second input of this two-input operator.
- * This method is guaranteed to not be called concurrently with other methods of the operator.
- *
- * @see org.apache.flink.streaming.api.watermark.Watermark
- */
- public void processWatermark2(Watermark mark) throws Exception;
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
deleted file mode 100644
index 806cef2..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.co;
-
-import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
-import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
-import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
-import org.apache.flink.streaming.api.operators.TimestampedCollector;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-public class CoStreamFlatMap<IN1, IN2, OUT>
- extends AbstractUdfStreamOperator<OUT, CoFlatMapFunction<IN1, IN2, OUT>>
- implements TwoInputStreamOperator<IN1, IN2, OUT> {
-
- private static final long serialVersionUID = 1L;
-
- private transient TimestampedCollector<OUT> collector;
-
- // We keep track of watermarks from both inputs, the combined input is the minimum
- // Once the minimum advances we emit a new watermark for downstream operators
- private long combinedWatermark = Long.MIN_VALUE;
- private long input1Watermark = Long.MIN_VALUE;
- private long input2Watermark = Long.MIN_VALUE;
-
- public CoStreamFlatMap(CoFlatMapFunction<IN1, IN2, OUT> flatMapper) {
- super(flatMapper);
- }
-
- @Override
- public void open() throws Exception {
- super.open();
- collector = new TimestampedCollector<OUT>(output);
- }
-
- @Override
- public void processElement1(StreamRecord<IN1> element) throws Exception {
- collector.setTimestamp(element.getTimestamp());
- userFunction.flatMap1(element.getValue(), collector);
-
- }
-
- @Override
- public void processElement2(StreamRecord<IN2> element) throws Exception {
- collector.setTimestamp(element.getTimestamp());
- userFunction.flatMap2(element.getValue(), collector);
- }
-
- @Override
- public void processWatermark1(Watermark mark) throws Exception {
- input1Watermark = mark.getTimestamp();
- long newMin = Math.min(input1Watermark, input2Watermark);
- if (newMin > combinedWatermark) {
- combinedWatermark = newMin;
- output.emitWatermark(new Watermark(combinedWatermark));
- }
- }
-
- @Override
- public void processWatermark2(Watermark mark) throws Exception {
- input2Watermark = mark.getTimestamp();
- long newMin = Math.min(input1Watermark, input2Watermark);
- if (newMin > combinedWatermark) {
- combinedWatermark = newMin;
- output.emitWatermark(new Watermark(combinedWatermark));
- }
- }
-
- protected TimestampedCollector<OUT> getCollector() {
- return collector;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamMap.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamMap.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamMap.java
deleted file mode 100644
index e34e727..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamMap.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.co;
-
-import org.apache.flink.streaming.api.functions.co.CoMapFunction;
-import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
-import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-public class CoStreamMap<IN1, IN2, OUT>
- extends AbstractUdfStreamOperator<OUT, CoMapFunction<IN1, IN2, OUT>>
- implements TwoInputStreamOperator<IN1, IN2, OUT> {
-
- private static final long serialVersionUID = 1L;
-
- // We keep track of watermarks from both inputs, the combined input is the minimum
- // Once the minimum advances we emit a new watermark for downstream operators
- private long combinedWatermark = Long.MIN_VALUE;
- private long input1Watermark = Long.MIN_VALUE;
- private long input2Watermark = Long.MIN_VALUE;
-
- public CoStreamMap(CoMapFunction<IN1, IN2, OUT> mapper) {
- super(mapper);
- }
-
- @Override
- public void processElement1(StreamRecord<IN1> element) throws Exception {
- output.collect(element.replace(userFunction.map1(element.getValue())));
- }
-
- @Override
- public void processElement2(StreamRecord<IN2> element) throws Exception {
- output.collect(element.replace(userFunction.map2(element.getValue())));
- }
-
- @Override
- public void processWatermark1(Watermark mark) throws Exception {
- input1Watermark = mark.getTimestamp();
- long newMin = Math.min(input1Watermark, input2Watermark);
- if (newMin > combinedWatermark) {
- combinedWatermark = newMin;
- output.emitWatermark(new Watermark(combinedWatermark));
- }
- }
-
- @Override
- public void processWatermark2(Watermark mark) throws Exception {
- input2Watermark = mark.getTimestamp();
- long newMin = Math.min(input1Watermark, input2Watermark);
- if (newMin > combinedWatermark) {
- combinedWatermark = newMin;
- output.emitWatermark(new Watermark(combinedWatermark));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/CoFeedbackTransformation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/CoFeedbackTransformation.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/CoFeedbackTransformation.java
deleted file mode 100644
index f9c95f5..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/CoFeedbackTransformation.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.transformations;
-
-import com.google.common.collect.Lists;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * This represents a feedback point in a topology. The type of the feedback elements must not match
- * the type of the upstream {@code StreamTransformation} because the only allowed operations
- * after a {@code CoFeedbackTransformation} are
- * {@link org.apache.flink.streaming.api.transformations.TwoInputTransformation TwoInputTransformations}.
- * The upstream {@code StreamTransformation} will be connected to the first input of the Co-Transform
- * while the feedback edges will be connected to the second input.
- *
- * <p>
- * Both the partitioning of the input and the feedback edges is preserved. They can also have
- * differing partitioning strategies. This requires, however, that the parallelism of the feedback
- * {@code StreamTransformations} must match the parallelism of the input
- * {@code StreamTransformation}.
- *
- * <p>
- * The upstream {@code StreamTransformation} is not wired to this {@code CoFeedbackTransformation}.
- * It is instead directly wired to the {@code TwoInputTransformation} after this
- * {@code CoFeedbackTransformation}.
- *
- * <p>
- * This is different from Iterations in batch processing.
- * @see org.apache.flink.streaming.api.transformations.FeedbackTransformation
- *
- * @param <F> The type of the feedback elements.
- *
- */
-public class CoFeedbackTransformation<F> extends StreamTransformation<F> {
-
- private final List<StreamTransformation<F>> feedbackEdges;
-
- private final Long waitTime;
-
- /**
- * Creates a new {@code CoFeedbackTransformation} from the given input.
- *
- * @param parallelism The parallelism of the upstream {@code StreamTransformatino} and the
- * feedback edges.
- * @param feedbackType The type of the feedback edges
- * @param waitTime The wait time of the feedback operator. After the time expires
- * the operation will close and not receive any more feedback elements.
- */
- public CoFeedbackTransformation(int parallelism,
- TypeInformation<F> feedbackType,
- Long waitTime) {
- super("CoFeedback", feedbackType, parallelism);
- this.waitTime = waitTime;
- this.feedbackEdges = Lists.newArrayList();
- }
-
- /**
- * Adds a feedback edge. The parallelism of the {@code StreamTransformation} must match
- * the parallelism of the input {@code StreamTransformation} of the upstream
- * {@code StreamTransformation}.
- *
- * @param transform The new feedback {@code StreamTransformation}.
- */
- public void addFeedbackEdge(StreamTransformation<F> transform) {
-
- if (transform.getParallelism() != this.getParallelism()) {
- throw new UnsupportedOperationException(
- "Parallelism of the feedback stream must match the parallelism of the original" +
- " stream. Parallelism of original stream: " + this.getParallelism() +
- "; parallelism of feedback stream: " + transform.getParallelism());
- }
-
- feedbackEdges.add(transform);
- }
-
- /**
- * Returns the list of feedback {@code StreamTransformations}.
- */
- public List<StreamTransformation<F>> getFeedbackEdges() {
- return feedbackEdges;
- }
-
- /**
- * Returns the wait time. This is the amount of time that the feedback operator keeps listening
- * for feedback elements. Once the time expires the operation will close and will not receive
- * further elements.
- */
- public Long getWaitTime() {
- return waitTime;
- }
-
- @Override
- public final void setChainingStrategy(ChainingStrategy strategy) {
- throw new UnsupportedOperationException("Cannot set chaining strategy on Split Transformation.");
- }
-
- @Override
- public Collection<StreamTransformation<?>> getTransitivePredecessors() {
- return Collections.<StreamTransformation<?>>singleton(this);
- }
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/FeedbackTransformation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/FeedbackTransformation.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/FeedbackTransformation.java
deleted file mode 100644
index 87c7f16..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/FeedbackTransformation.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.transformations;
-
-import com.google.common.collect.Lists;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-
-import java.util.Collection;
-import java.util.List;
-
-/**
- * This represents a feedback point in a topology.
- *
- * <p>
- * This is different from how iterations work in batch processing. Once a feedback point is defined
- * you can connect one or several {@code StreamTransformations} as a feedback edges. Operations
- * downstream from the feedback point will receive elements from the input of this feedback point
- * and from the feedback edges.
- *
- * <p>
- * Both the partitioning of the input and the feedback edges is preserved. They can also have
- * differing partitioning strategies. This requires, however, that the parallelism of the feedback
- * {@code StreamTransformations} must match the parallelism of the input
- * {@code StreamTransformation}.
- *
- * <p>
- * The type of the input {@code StreamTransformation} and the feedback {@code StreamTransformation}
- * must match.
- *
- * @param <T> The type of the input elements and the feedback elements.
- */
-public class FeedbackTransformation<T> extends StreamTransformation<T> {
-
- private final StreamTransformation<T> input;
-
- private final List<StreamTransformation<T>> feedbackEdges;
-
- private final Long waitTime;
-
- /**
- * Creates a new {@code FeedbackTransformation} from the given input.
- *
- * @param input The input {@code StreamTransformation}
- * @param waitTime The wait time of the feedback operator. After the time expires
- * the operation will close and not receive any more feedback elements.
- */
- public FeedbackTransformation(StreamTransformation<T> input, Long waitTime) {
- super("Feedback", input.getOutputType(), input.getParallelism());
- this.input = input;
- this.waitTime = waitTime;
- this.feedbackEdges = Lists.newArrayList();
- }
-
- /**
- * Returns the input {@code StreamTransformation} of this {@code FeedbackTransformation}.
- */
- public StreamTransformation<T> getInput() {
- return input;
- }
-
- /**
- * Adds a feedback edge. The parallelism of the {@code StreamTransformation} must match
- * the parallelism of the input {@code StreamTransformation} of this
- * {@code FeedbackTransformation}
- *
- * @param transform The new feedback {@code StreamTransformation}.
- */
- public void addFeedbackEdge(StreamTransformation<T> transform) {
-
- if (transform.getParallelism() != this.getParallelism()) {
- throw new UnsupportedOperationException(
- "Parallelism of the feedback stream must match the parallelism of the original" +
- " stream. Parallelism of original stream: " + this.getParallelism() +
- "; parallelism of feedback stream: " + transform.getParallelism());
- }
-
- feedbackEdges.add(transform);
- }
-
- /**
- * Returns the list of feedback {@code StreamTransformations}.
- */
- public List<StreamTransformation<T>> getFeedbackEdges() {
- return feedbackEdges;
- }
-
- /**
- * Returns the wait time. This is the amount of time that the feedback operator keeps listening
- * for feedback elements. Once the time expires the operation will close and will not receive
- * further elements.
- */
- public Long getWaitTime() {
- return waitTime;
- }
-
- @Override
- public final void setChainingStrategy(ChainingStrategy strategy) {
- throw new UnsupportedOperationException("Cannot set chaining strategy on Split Transformation.");
- }
-
- @Override
- public Collection<StreamTransformation<?>> getTransitivePredecessors() {
- List<StreamTransformation<?>> result = Lists.newArrayList();
- result.add(this);
- result.addAll(input.getTransitivePredecessors());
- return result;
- }
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/OneInputTransformation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/OneInputTransformation.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/OneInputTransformation.java
deleted file mode 100644
index 031c481..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/OneInputTransformation.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.transformations;
-
-import com.google.common.collect.Lists;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-
-import java.util.Collection;
-import java.util.List;
-
-/**
- * This Transformation represents the application of a
- * {@link org.apache.flink.streaming.api.operators.OneInputStreamOperator} to one input
- * {@link org.apache.flink.streaming.api.transformations.StreamTransformation}.
- *
- * @param <IN> The type of the elements in the nput {@code StreamTransformation}
- * @param <OUT> The type of the elements that result from this {@code OneInputTransformation}
- */
-public class OneInputTransformation<IN, OUT> extends StreamTransformation<OUT> {
-
- private final StreamTransformation<IN> input;
-
- private final OneInputStreamOperator<IN, OUT> operator;
-
- private KeySelector<IN, ?> stateKeySelector;
-
- private TypeInformation<?> stateKeyType;
-
- /**
- * Creates a new {@code OneInputTransformation} from the given input and operator.
- *
- * @param input The input {@code StreamTransformation}
- * @param name The name of the {@code StreamTransformation}, this will be shown in Visualizations and the Log
- * @param operator The {@code TwoInputStreamOperator}
- * @param outputType The type of the elements produced by this {@code OneInputTransformation}
- * @param parallelism The parallelism of this {@code OneInputTransformation}
- */
- public OneInputTransformation(
- StreamTransformation<IN> input,
- String name,
- OneInputStreamOperator<IN, OUT> operator,
- TypeInformation<OUT> outputType,
- int parallelism) {
- super(name, outputType, parallelism);
- this.input = input;
- this.operator = operator;
- }
-
- /**
- * Returns the input {@code StreamTransformation} of this {@code OneInputTransformation}.
- */
- public StreamTransformation<IN> getInput() {
- return input;
- }
-
- /**
- * Returns the {@code TypeInformation} for the elements of the input.
- */
- public TypeInformation<IN> getInputType() {
- return input.getOutputType();
- }
-
- /**
- * Returns the {@code TwoInputStreamOperator} of this Transformation.
- */
- public OneInputStreamOperator<IN, OUT> getOperator() {
- return operator;
- }
-
- /**
- * Sets the {@link KeySelector} that must be used for partitioning keyed state of this operation.
- *
- * @param stateKeySelector The {@code KeySelector} to set
- */
- public void setStateKeySelector(KeySelector<IN, ?> stateKeySelector) {
- this.stateKeySelector = stateKeySelector;
- }
-
- /**
- * Returns the {@code KeySelector} that must be used for partitioning keyed state in this
- * Operation.
- *
- * @see #setStateKeySelector
- */
- public KeySelector<IN, ?> getStateKeySelector() {
- return stateKeySelector;
- }
-
- public void setStateKeyType(TypeInformation<?> stateKeyType) {
- this.stateKeyType = stateKeyType;
- }
-
- public TypeInformation<?> getStateKeyType() {
- return stateKeyType;
- }
-
- @Override
- public Collection<StreamTransformation<?>> getTransitivePredecessors() {
- List<StreamTransformation<?>> result = Lists.newArrayList();
- result.add(this);
- result.addAll(input.getTransitivePredecessors());
- return result;
- }
-
- @Override
- public final void setChainingStrategy(ChainingStrategy strategy) {
- operator.setChainingStrategy(strategy);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/PartitionTransformation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/PartitionTransformation.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/PartitionTransformation.java
deleted file mode 100644
index fa85349..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/PartitionTransformation.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.transformations;
-
-import com.google.common.collect.Lists;
-
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
-
-import java.util.Collection;
-import java.util.List;
-
-/**
- * This transformation represents a change of partitioning of the input elements.
- *
- * <p>
- * This does not create a physical operation, it only affects how upstream operations are
- * connected to downstream operations.
- *
- * @param <T> The type of the elements that result from this {@code PartitionTransformation}
- */
-public class PartitionTransformation<T> extends StreamTransformation<T> {
-
- private final StreamTransformation<T> input;
- private final StreamPartitioner<T> partitioner;
-
- /**
- * Creates a new {@code PartitionTransformation} from the given input and
- * {@link StreamPartitioner}.
- *
- * @param input The input {@code StreamTransformation}
- * @param partitioner The {@code StreamPartitioner}
- */
- public PartitionTransformation(StreamTransformation<T> input, StreamPartitioner<T> partitioner) {
- super("Partition", input.getOutputType(), input.getParallelism());
- this.input = input;
- this.partitioner = partitioner;
- }
-
- /**
- * Returns the input {@code StreamTransformation} of this {@code SinkTransformation}.
- */
- public StreamTransformation<T> getInput() {
- return input;
- }
-
- /**
- * Returns the {@code StreamPartitioner} that must be used for partitioning the elements
- * of the input {@code StreamTransformation}.
- */
- public StreamPartitioner<T> getPartitioner() {
- return partitioner;
- }
-
- @Override
- public Collection<StreamTransformation<?>> getTransitivePredecessors() {
- List<StreamTransformation<?>> result = Lists.newArrayList();
- result.add(this);
- result.addAll(input.getTransitivePredecessors());
- return result;
- }
-
- @Override
- public final void setChainingStrategy(ChainingStrategy strategy) {
- throw new UnsupportedOperationException("Cannot set chaining strategy on Union Transformation.");
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SelectTransformation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SelectTransformation.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SelectTransformation.java
deleted file mode 100644
index a66b65a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SelectTransformation.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.transformations;
-
-import com.google.common.collect.Lists;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-
-import java.util.Collection;
-import java.util.List;
-
-/**
- * This transformation represents a selection of only certain upstream elements. This must
- * follow a {@link org.apache.flink.streaming.api.transformations.SplitTransformation} that
- * splits elements into several logical streams with assigned names.
- *
- * <p>
- * This does not create a physical operation, it only affects how upstream operations are
- * connected to downstream operations.
- *
- * @param <T> The type of the elements that result from this {@code SelectTransformation}
- */
-public class SelectTransformation<T> extends StreamTransformation<T> {
-
- private final StreamTransformation<T> input;
- private final List<String> selectedNames;
-
- /**
- * Creates a new {@code SelectionTransformation} from the given input that only selects
- * the streams with the selected names.
- *
- * @param input The input {@code StreamTransformation}
- * @param selectedNames The names from the upstream {@code SplitTransformation} that this
- * {@code SelectTransformation} selects.
- */
- public SelectTransformation(StreamTransformation<T> input,
- List<String> selectedNames) {
- super("Select", input.getOutputType(), input.getParallelism());
- this.input = input;
- this.selectedNames = selectedNames;
- }
-
- /**
- * Returns the input {@code StreamTransformation}.
- */
- public StreamTransformation<T> getInput() {
- return input;
- }
-
- /**
- * Returns the names of the split streams that this {@code SelectTransformation} selects.
- */
- public List<String> getSelectedNames() {
- return selectedNames;
- }
-
- @Override
- public Collection<StreamTransformation<?>> getTransitivePredecessors() {
- List<StreamTransformation<?>> result = Lists.newArrayList();
- result.add(this);
- result.addAll(input.getTransitivePredecessors());
- return result;
- }
-
- @Override
- public final void setChainingStrategy(ChainingStrategy strategy) {
- throw new UnsupportedOperationException("Cannot set chaining strategy on Select Transformation.");
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SinkTransformation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SinkTransformation.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SinkTransformation.java
deleted file mode 100644
index 84ad6db..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SinkTransformation.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.transformations;
-
-import com.google.common.collect.Lists;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-import org.apache.flink.streaming.api.operators.StreamSink;
-
-import java.util.Collection;
-import java.util.List;
-
-/**
- * This Transformation represents a Sink.
- *
- * @param <T> The type of the elements in the input {@code SinkTransformation}
- */
-public class SinkTransformation<T> extends StreamTransformation<Object> {
-
- private final StreamTransformation<T> input;
-
- private final StreamSink<T> operator;
-
- // We need this because sinks can also have state that is partitioned by key
- private KeySelector<T, ?> stateKeySelector;
-
- private TypeInformation<?> stateKeyType;
-
- /**
- * Creates a new {@code SinkTransformation} from the given input {@code StreamTransformation}.
- *
- * @param input The input {@code StreamTransformation}
- * @param name The name of the {@code StreamTransformation}, this will be shown in Visualizations and the Log
- * @param operator The sink operator
- * @param parallelism The parallelism of this {@code SinkTransformation}
- */
- public SinkTransformation(
- StreamTransformation<T> input,
- String name,
- StreamSink<T> operator,
- int parallelism) {
- super(name, TypeExtractor.getForClass(Object.class), parallelism);
- this.input = input;
- this.operator = operator;
- }
-
- /**
- * Returns the input {@code StreamTransformation} of this {@code SinkTransformation}.
- */
- public StreamTransformation<T> getInput() {
- return input;
- }
-
- /**
- * Returns the {@link StreamSink} that is the operator of this {@code SinkTransformation}.
- */
- public StreamSink<T> getOperator() {
- return operator;
- }
-
- /**
- * Sets the {@link KeySelector} that must be used for partitioning keyed state of this Sink.
- *
- * @param stateKeySelector The {@code KeySelector} to set
- */
- public void setStateKeySelector(KeySelector<T, ?> stateKeySelector) {
- this.stateKeySelector = stateKeySelector;
- }
-
- /**
- * Returns the {@code KeySelector} that must be used for partitioning keyed state in this
- * Sink.
- *
- * @see #setStateKeySelector
- */
- public KeySelector<T, ?> getStateKeySelector() {
- return stateKeySelector;
- }
-
- public void setStateKeyType(TypeInformation<?> stateKeyType) {
- this.stateKeyType = stateKeyType;
- }
-
- public TypeInformation<?> getStateKeyType() {
- return stateKeyType;
- }
-
- @Override
- public Collection<StreamTransformation<?>> getTransitivePredecessors() {
- List<StreamTransformation<?>> result = Lists.newArrayList();
- result.add(this);
- result.addAll(input.getTransitivePredecessors());
- return result;
- }
-
- @Override
- public final void setChainingStrategy(ChainingStrategy strategy) {
- operator.setChainingStrategy(strategy);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java
deleted file mode 100644
index 9835606..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.transformations;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-import org.apache.flink.streaming.api.operators.StreamSource;
-
-import java.util.Collection;
-import java.util.Collections;
-
-/**
- * This represents a Source. This does not actually transform anything since it has no inputs but
- * it is the root {@code StreamTransformation} of any topology.
- *
- * @param <T> The type of the elements that this source produces
- */
-public class SourceTransformation<T> extends StreamTransformation<T> {
-
- private final StreamSource<T> operator;
-
- /**
- * Creates a new {@code SourceTransformation} from the given operator.
- *
- * @param name The name of the {@code SourceTransformation}, this will be shown in Visualizations and the Log
- * @param operator The {@code StreamSource} that is the operator of this Transformation
- * @param outputType The type of the elements produced by this {@code SourceTransformation}
- * @param parallelism The parallelism of this {@code SourceTransformation}
- */
- public SourceTransformation(
- String name,
- StreamSource<T> operator,
- TypeInformation<T> outputType,
- int parallelism) {
- super(name, outputType, parallelism);
- this.operator = operator;
- }
-
- /**
- * Returns the {@code StreamSource}, the operator of this {@code SourceTransformation}.
- */
- public StreamSource<T> getOperator() {
- return operator;
- }
-
- @Override
- public Collection<StreamTransformation<?>> getTransitivePredecessors() {
- return Collections.<StreamTransformation<?>>singleton(this);
- }
-
- @Override
- public final void setChainingStrategy(ChainingStrategy strategy) {
- operator.setChainingStrategy(strategy);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SplitTransformation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SplitTransformation.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SplitTransformation.java
deleted file mode 100644
index 96c1c9e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SplitTransformation.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.transformations;
-
-import com.google.common.collect.Lists;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-
-import java.util.Collection;
-import java.util.List;
-
-/**
- * This transformation represents a split of one
- * {@link org.apache.flink.streaming.api.datastream.DataStream} into several {@code DataStreams}
- * using an {@link org.apache.flink.streaming.api.collector.selector.OutputSelector}.
- *
- * <p>
- * This does not create a physical operation, it only affects how upstream operations are
- * connected to downstream operations.
- *
- * @param <T> The type of the elements that result from this {@code SplitTransformation}
- */
-public class SplitTransformation<T> extends StreamTransformation<T> {
-
- private final StreamTransformation<T> input;
-
- private final OutputSelector<T> outputSelector;
-
- /**
- * Creates a new {@code SplitTransformation} from the given input and {@code OutputSelector}.
- *
- * @param input The input {@code StreamTransformation}
- * @param outputSelector The output selector
- */
- public SplitTransformation(StreamTransformation<T> input,
- OutputSelector<T> outputSelector) {
- super("Split", input.getOutputType(), input.getParallelism());
- this.input = input;
- this.outputSelector = outputSelector;
- }
-
- /**
- * Returns the input {@code StreamTransformation}.
- */
- public StreamTransformation<T> getInput() {
- return input;
- }
-
- /**
- * Returns the {@code OutputSelector}
- */
- public OutputSelector<T> getOutputSelector() {
- return outputSelector;
- }
-
- @Override
- public Collection<StreamTransformation<?>> getTransitivePredecessors() {
- List<StreamTransformation<?>> result = Lists.newArrayList();
- result.add(this);
- result.addAll(input.getTransitivePredecessors());
- return result;
- }
-
- @Override
- public final void setChainingStrategy(ChainingStrategy strategy) {
- throw new UnsupportedOperationException("Cannot set chaining strategy on Split Transformation.");
- }
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
deleted file mode 100644
index 4e6dc42..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
+++ /dev/null
@@ -1,321 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.transformations;
-
-import com.google.common.base.Preconditions;
-
-import org.apache.flink.api.common.functions.InvalidTypesException;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.MissingTypeInfo;
-import org.apache.flink.streaming.api.graph.StreamGraph;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-
-import java.util.Collection;
-
-/**
- * A {@code StreamTransformation} represents the operation that creates a
- * {@link org.apache.flink.streaming.api.datastream.DataStream}. Every
- * {@link org.apache.flink.streaming.api.datastream.DataStream} has an underlying
- * {@code StreamTransformation} that is the origin of said DataStream.
- *
- * <p>
- * API operations such as {@link org.apache.flink.streaming.api.datastream.DataStream#map} create
- * a tree of {@code StreamTransformation}s underneath. When the stream program is to be executed this
- * graph is translated to a {@link StreamGraph} using
- * {@link org.apache.flink.streaming.api.graph.StreamGraphGenerator}.
- *
- * <p>
- * A {@code StreamTransformation} does not necessarily correspond to a physical operation
- * at runtime. Some operations are only logical concepts. Examples of this are union,
- * split/select data stream, partitioning.
- *
- * <p>
- * The following graph of {@code StreamTransformations}:
- *
- * <pre>
- * Source Source
- * + +
- * | |
- * v v
- * Rebalance HashPartition
- * + +
- * | |
- * | |
- * +------>Union<------+
- * +
- * |
- * v
- * Split
- * +
- * |
- * v
- * Select
- * +
- * v
- * Map
- * +
- * |
- * v
- * Sink
- * </pre>
- *
- * Would result in this graph of operations at runtime:
- *
- * <pre>
- * Source Source
- * + +
- * | |
- * | |
- * +------->Map<-------+
- * +
- * |
- * v
- * Sink
- * </pre>
- *
- * The information about partitioning, union, split/select end up being encoded in the edges
- * that connect the sources to the map operation.
- *
- * @param <T> The type of the elements that result from this {@code StreamTransformation}
- */
-public abstract class StreamTransformation<T> {
-
- // This is used to assign a unique ID to every StreamTransformation
- protected static Integer idCounter = 0;
- public static int getNewNodeId() {
- idCounter++;
- return idCounter;
- }
-
- protected final int id;
-
- protected String name;
-
- protected TypeInformation<T> outputType;
- // This is used to handle MissingTypeInfo. As long as the outputType has not been queried
- // it can still be changed using setOutputType(). Afterwards an exception is thrown when
- // trying to change the output type.
- protected boolean typeUsed;
-
- private int parallelism;
-
- protected long bufferTimeout = -1;
-
- protected StreamGraph.ResourceStrategy resourceStrategy = StreamGraph.ResourceStrategy.DEFAULT;
-
- /**
- * Creates a new {@code StreamTransformation} with the given name, output type and parallelism.
- *
- * @param name The name of the {@code StreamTransformation}, this will be shown in Visualizations and the Log
- * @param outputType The output type of this {@code StreamTransformation}
- * @param parallelism The parallelism of this {@code StreamTransformation}
- */
- public StreamTransformation(String name, TypeInformation<T> outputType, int parallelism) {
- this.id = getNewNodeId();
- this.name = Preconditions.checkNotNull(name);
- this.outputType = outputType;
- this.parallelism = parallelism;
- }
-
- /**
- * Returns the unique ID of this {@code StreamTransformation}.
- */
- public int getId() {
- return id;
- }
-
- /**
- * Changes the name of this {@code StreamTransformation}.
- */
- public void setName(String name) {
- this.name = name;
- }
-
- /**
- * Returns the name of this {@code StreamTransformation}.
- */
- public String getName() {
- return name;
- }
-
- /**
- * Returns the parallelism of this {@code StreamTransformation}
- */
- public int getParallelism() {
- return parallelism;
- }
-
- /**
- * Sets the parallelism of this {@code StreamTransformation}
- * @param parallelism The new parallelism to set on this {@code StreamTransformation}
- */
- public void setParallelism(int parallelism) {
- Preconditions.checkArgument(parallelism > 0, "Parallelism must be bigger than zero.");
- this.parallelism = parallelism;
- }
-
- /**
- * Tries to fill in the type information. Type information can be filled in
- * later when the program uses a type hint. This method checks whether the
- * type information has ever been accessed before and does not allow
- * modifications if the type was accessed already. This ensures consistency
- * by making sure different parts of the operation do not assume different
- * type information.
- *
- * @param outputType The type information to fill in.
- *
- * @throws IllegalStateException Thrown, if the type information has been accessed before.
- */
- public void setOutputType(TypeInformation<T> outputType) {
- if (typeUsed) {
- throw new IllegalStateException(
- "TypeInformation cannot be filled in for the type after it has been used. "
- + "Please make sure that the type info hints are the first call after"
- + " the transformation function, "
- + "before any access to types or semantic properties, etc.");
- }
- this.outputType = outputType;
- }
-
- /**
- * Returns the output type of this {@code StreamTransformation} as a {@link TypeInformation}. Once
- * this is used once the output type cannot be changed anymore using {@link #setOutputType}.
- *
- * @return The output type of this {@code StreamTransformation}
- */
- public TypeInformation<T> getOutputType() {
- if (outputType instanceof MissingTypeInfo) {
- MissingTypeInfo typeInfo = (MissingTypeInfo) this.outputType;
- throw new InvalidTypesException(
- "The return type of function '"
- + typeInfo.getFunctionName()
- + "' could not be determined automatically, due to type erasure. "
- + "You can give type information hints by using the returns(...) "
- + "method on the result of the transformation call, or by letting "
- + "your function implement the 'ResultTypeQueryable' "
- + "interface.", typeInfo.getTypeException());
- }
- typeUsed = true;
- return this.outputType;
- }
-
- /**
- * Sets the chaining strategy of this {@code StreamTransformation}.
- */
- public abstract void setChainingStrategy(ChainingStrategy strategy);
-
- /**
- * Set the buffer timeout of this {@code StreamTransformation}. The timeout is used when
- * sending elements over the network. The timeout specifies how long a network buffer
- * should be kept waiting before sending. A higher timeout means that more elements will
- * be sent in one buffer, this increases throughput. The latency, however, is negatively
- * affected by a higher timeout.
- */
- public void setBufferTimeout(long bufferTimeout) {
- this.bufferTimeout = bufferTimeout;
- }
-
- /**
- * Returns the buffer timeout of this {@code StreamTransformation}.
- *
- * <p>
- * {@see #setBufferTimeout}
- */
- public long getBufferTimeout() {
- return bufferTimeout;
- }
-
- /**
- * Sets the {@link org.apache.flink.streaming.api.graph.StreamGraph.ResourceStrategy} of this
- * {@code StreamTransformation}. The resource strategy is used when scheduling operations on actual
- * workers when transforming the StreamTopology to an
- * {@link org.apache.flink.runtime.executiongraph.ExecutionGraph}.
- */
- public void setResourceStrategy(StreamGraph.ResourceStrategy resourceStrategy) {
- this.resourceStrategy = resourceStrategy;
- }
-
- /**
- * Returns the {@code ResourceStrategy} of this {@code StreamTransformation}.
- *
- * <p>
- * {@see #setResourceStrategy}
- */
- public StreamGraph.ResourceStrategy getResourceStrategy() {
- return resourceStrategy;
- }
-
- /**
- * Returns all transitive predecessor {@code StreamTransformation}s of this {@code StreamTransformation}. This
- * is, for example, used when determining whether a feedback edge of an iteration
- * actually has the iteration head as a predecessor.
- *
- * @return The list of transitive predecessors.
- */
- public abstract Collection<StreamTransformation<?>> getTransitivePredecessors();
-
- @Override
- public String toString() {
- return getClass().getSimpleName() + "{" +
- "id=" + id +
- ", name='" + name + '\'' +
- ", outputType=" + outputType +
- ", parallelism=" + parallelism +
- '}';
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (!(o instanceof StreamTransformation)) {
- return false;
- }
-
- StreamTransformation<?> that = (StreamTransformation<?>) o;
-
- if (bufferTimeout != that.bufferTimeout) {
- return false;
- }
- if (id != that.id) {
- return false;
- }
- if (parallelism != that.parallelism) {
- return false;
- }
- if (!name.equals(that.name)) {
- return false;
- }
- if (outputType != null ? !outputType.equals(that.outputType) : that.outputType != null) {
- return false;
- }
- return resourceStrategy == that.resourceStrategy;
- }
-
- @Override
- public int hashCode() {
- int result = id;
- result = 31 * result + name.hashCode();
- result = 31 * result + (outputType != null ? outputType.hashCode() : 0);
- result = 31 * result + parallelism;
- result = 31 * result + (int) (bufferTimeout ^ (bufferTimeout >>> 32));
- result = 31 * result + resourceStrategy.hashCode();
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/TwoInputTransformation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/TwoInputTransformation.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/TwoInputTransformation.java
deleted file mode 100644
index 30f0733..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/TwoInputTransformation.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.transformations;
-
-import com.google.common.collect.Lists;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
-
-import java.util.Collection;
-import java.util.List;
-
-/**
- * This Transformation represents the application of a
- * {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator} to two input
- * {@code StreamTransformations}. The result is again only one stream.
- *
- * @param <IN1> The type of the elements in the first input {@code StreamTransformation}
- * @param <IN2> The type of the elements in the second input {@code StreamTransformation}
- * @param <OUT> The type of the elements that result from this {@code TwoInputTransformation}
- */
-public class TwoInputTransformation<IN1, IN2, OUT> extends StreamTransformation<OUT> {
-
- private final StreamTransformation<IN1> input1;
- private final StreamTransformation<IN2> input2;
-
- private final TwoInputStreamOperator<IN1, IN2, OUT> operator;
-
- /**
- * Creates a new {@code TwoInputTransformation} from the given inputs and operator.
- *
- * @param input1 The first input {@code StreamTransformation}
- * @param input2 The second input {@code StreamTransformation}
- * @param name The name of the {@code StreamTransformation}, this will be shown in Visualizations and the Log
- * @param operator The {@code TwoInputStreamOperator}
- * @param outputType The type of the elements produced by this Transformation
- * @param parallelism The parallelism of this Transformation
- */
- public TwoInputTransformation(
- StreamTransformation<IN1> input1,
- StreamTransformation<IN2> input2,
- String name,
- TwoInputStreamOperator<IN1, IN2, OUT> operator,
- TypeInformation<OUT> outputType,
- int parallelism) {
- super(name, outputType, parallelism);
- this.input1 = input1;
- this.input2 = input2;
- this.operator = operator;
- }
-
- /**
- * Returns the first input {@code StreamTransformation} of this {@code TwoInputTransformation}.
- */
- public StreamTransformation<IN1> getInput1() {
- return input1;
- }
-
- /**
- * Returns the first input {@code StreamTransformation} of this {@code TwoInputTransformation}.
- */
- public StreamTransformation<IN2> getInput2() {
- return input2;
- }
-
- /**
- * Returns the {@code TypeInformation} for the elements from the first input.
- */
- public TypeInformation<IN1> getInputType1() {
- return input1.getOutputType();
- }
-
- /**
- * Returns the {@code TypeInformation} for the elements from the first input.
- */
- public TypeInformation<IN2> getInputType2() {
- return input2.getOutputType();
- }
-
- /**
- * Returns the {@code TwoInputStreamOperator} of this Transformation.
- */
- public TwoInputStreamOperator<IN1, IN2, OUT> getOperator() {
- return operator;
- }
-
- @Override
- public Collection<StreamTransformation<?>> getTransitivePredecessors() {
- List<StreamTransformation<?>> result = Lists.newArrayList();
- result.add(this);
- result.addAll(input1.getTransitivePredecessors());
- result.addAll(input2.getTransitivePredecessors());
- return result;
- }
-
- @Override
- public final void setChainingStrategy(ChainingStrategy strategy) {
- operator.setChainingStrategy(strategy);
- }
-
-}