You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:03:52 UTC
[36/51] [partial] flink git commit: [FLINK-2877] Move Streaming API
out of Staging package
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
deleted file mode 100644
index 45cfff1..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ /dev/null
@@ -1,444 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.graph;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobgraph.DistributionPattern;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.ScheduleMode;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
-import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
-import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.streaming.api.CheckpointingMode;
-import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
-import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
-import org.apache.flink.streaming.runtime.tasks.StreamIterationHead;
-import org.apache.flink.streaming.runtime.tasks.StreamIterationTail;
-import org.apache.flink.util.InstantiationUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class StreamingJobGraphGenerator {
-
- private static final Logger LOG = LoggerFactory.getLogger(StreamingJobGraphGenerator.class);
-
- private StreamGraph streamGraph;
-
- private Map<Integer, JobVertex> jobVertices;
- private JobGraph jobGraph;
- private Collection<Integer> builtVertices;
-
- private List<StreamEdge> physicalEdgesInOrder;
-
- private Map<Integer, Map<Integer, StreamConfig>> chainedConfigs;
-
- private Map<Integer, StreamConfig> vertexConfigs;
- private Map<Integer, String> chainedNames;
-
- public StreamingJobGraphGenerator(StreamGraph streamGraph) {
- this.streamGraph = streamGraph;
- }
-
- private void init() {
- this.jobVertices = new HashMap<Integer, JobVertex>();
- this.builtVertices = new HashSet<Integer>();
- this.chainedConfigs = new HashMap<Integer, Map<Integer, StreamConfig>>();
- this.vertexConfigs = new HashMap<Integer, StreamConfig>();
- this.chainedNames = new HashMap<Integer, String>();
- this.physicalEdgesInOrder = new ArrayList<StreamEdge>();
- }
-
- public JobGraph createJobGraph(String jobName) {
- jobGraph = new JobGraph(jobName);
-
- // make sure that all vertices start immediately
- jobGraph.setScheduleMode(ScheduleMode.ALL);
-
- init();
-
- setChaining();
-
- setPhysicalEdges();
-
- setSlotSharing();
-
- configureCheckpointing();
-
- configureExecutionRetries();
-
- configureExecutionRetryDelay();
-
- try {
- InstantiationUtil.writeObjectToConfig(this.streamGraph.getExecutionConfig(), this.jobGraph.getJobConfiguration(), ExecutionConfig.CONFIG_KEY);
- } catch (IOException e) {
- throw new RuntimeException("Config object could not be written to Job Configuration: ", e);
- }
-
- return jobGraph;
- }
-
- private void setPhysicalEdges() {
- Map<Integer, List<StreamEdge>> physicalInEdgesInOrder = new HashMap<Integer, List<StreamEdge>>();
-
- for (StreamEdge edge : physicalEdgesInOrder) {
- int target = edge.getTargetId();
-
- List<StreamEdge> inEdges = physicalInEdgesInOrder.get(target);
-
- // create if not set
- if (inEdges == null) {
- inEdges = new ArrayList<StreamEdge>();
- physicalInEdgesInOrder.put(target, inEdges);
- }
-
- inEdges.add(edge);
- }
-
- for (Map.Entry<Integer, List<StreamEdge>> inEdges : physicalInEdgesInOrder.entrySet()) {
- int vertex = inEdges.getKey();
- List<StreamEdge> edgeList = inEdges.getValue();
-
- vertexConfigs.get(vertex).setInPhysicalEdges(edgeList);
- }
- }
-
- private void setChaining() {
- for (Integer sourceName : streamGraph.getSourceIDs()) {
- createChain(sourceName, sourceName);
- }
- }
-
- private List<StreamEdge> createChain(Integer startNode, Integer current) {
-
- if (!builtVertices.contains(startNode)) {
-
- List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();
-
- List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
- List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();
-
- for (StreamEdge outEdge : streamGraph.getStreamNode(current).getOutEdges()) {
- if (isChainable(outEdge)) {
- chainableOutputs.add(outEdge);
- } else {
- nonChainableOutputs.add(outEdge);
- }
- }
-
- for (StreamEdge chainable : chainableOutputs) {
- transitiveOutEdges.addAll(createChain(startNode, chainable.getTargetId()));
- }
-
- for (StreamEdge nonChainable : nonChainableOutputs) {
- transitiveOutEdges.add(nonChainable);
- createChain(nonChainable.getTargetId(), nonChainable.getTargetId());
- }
-
- chainedNames.put(current, createChainedName(current, chainableOutputs));
-
- StreamConfig config = current.equals(startNode) ? createProcessingVertex(startNode)
- : new StreamConfig(new Configuration());
-
- setVertexConfig(current, config, chainableOutputs, nonChainableOutputs);
-
- if (current.equals(startNode)) {
-
- config.setChainStart();
- config.setOutEdgesInOrder(transitiveOutEdges);
- config.setOutEdges(streamGraph.getStreamNode(current).getOutEdges());
-
- for (StreamEdge edge : transitiveOutEdges) {
- connect(startNode, edge);
- }
-
- config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNode));
-
- } else {
-
- Map<Integer, StreamConfig> chainedConfs = chainedConfigs.get(startNode);
-
- if (chainedConfs == null) {
- chainedConfigs.put(startNode, new HashMap<Integer, StreamConfig>());
- }
- chainedConfigs.get(startNode).put(current, config);
- }
-
- return transitiveOutEdges;
-
- } else {
- return new ArrayList<StreamEdge>();
- }
- }
-
- private String createChainedName(Integer vertexID, List<StreamEdge> chainedOutputs) {
- String operatorName = streamGraph.getStreamNode(vertexID).getOperatorName();
- if (chainedOutputs.size() > 1) {
- List<String> outputChainedNames = new ArrayList<String>();
- for (StreamEdge chainable : chainedOutputs) {
- outputChainedNames.add(chainedNames.get(chainable.getTargetId()));
- }
- return operatorName + " -> (" + StringUtils.join(outputChainedNames, ", ") + ")";
- } else if (chainedOutputs.size() == 1) {
- return operatorName + " -> " + chainedNames.get(chainedOutputs.get(0).getTargetId());
- } else {
- return operatorName;
- }
-
- }
-
- private StreamConfig createProcessingVertex(Integer vertexID) {
-
- JobVertex jobVertex = new JobVertex(chainedNames.get(vertexID));
- StreamNode vertex = streamGraph.getStreamNode(vertexID);
-
- jobVertex.setInvokableClass(vertex.getJobVertexClass());
-
- int parallelism = vertex.getParallelism();
-
- if (parallelism > 0) {
- jobVertex.setParallelism(parallelism);
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Parallelism set: {} for {}", parallelism, vertexID);
- }
-
- if (vertex.getInputFormat() != null) {
- jobVertex.setInputSplitSource(vertex.getInputFormat());
- }
-
- jobVertices.put(vertexID, jobVertex);
- builtVertices.add(vertexID);
- jobGraph.addVertex(jobVertex);
-
- return new StreamConfig(jobVertex.getConfiguration());
- }
-
- @SuppressWarnings("unchecked")
- private void setVertexConfig(Integer vertexID, StreamConfig config,
- List<StreamEdge> chainableOutputs, List<StreamEdge> nonChainableOutputs) {
-
- StreamNode vertex = streamGraph.getStreamNode(vertexID);
-
- config.setVertexID(vertexID);
- config.setBufferTimeout(vertex.getBufferTimeout());
-
- config.setTypeSerializerIn1(vertex.getTypeSerializerIn1());
- config.setTypeSerializerIn2(vertex.getTypeSerializerIn2());
- config.setTypeSerializerOut(vertex.getTypeSerializerOut());
-
- config.setStreamOperator(vertex.getOperator());
- config.setOutputSelectorWrapper(vertex.getOutputSelectorWrapper());
-
- config.setNumberOfOutputs(nonChainableOutputs.size());
- config.setNonChainedOutputs(nonChainableOutputs);
- config.setChainedOutputs(chainableOutputs);
-
- config.setCheckpointingEnabled(streamGraph.isCheckpointingEnabled());
- if (streamGraph.isCheckpointingEnabled()) {
- config.setCheckpointMode(streamGraph.getCheckpointingMode());
- config.setStateBackend(streamGraph.getStateBackend());
- } else {
- // the at least once input handler is slightly cheaper (in the absence of checkpoints),
- // so we use that one if checkpointing is not enabled
- config.setCheckpointMode(CheckpointingMode.AT_LEAST_ONCE);
- }
- config.setStatePartitioner((KeySelector<?, Serializable>) vertex.getStatePartitioner());
- config.setStateKeySerializer(vertex.getStateKeySerializer());
-
-
- Class<? extends AbstractInvokable> vertexClass = vertex.getJobVertexClass();
-
- if (vertexClass.equals(StreamIterationHead.class)
- || vertexClass.equals(StreamIterationTail.class)) {
- config.setIterationId(streamGraph.getBrokerID(vertexID));
- config.setIterationWaitTime(streamGraph.getLoopTimeout(vertexID));
- }
-
- List<StreamEdge> allOutputs = new ArrayList<StreamEdge>(chainableOutputs);
- allOutputs.addAll(nonChainableOutputs);
-
- vertexConfigs.put(vertexID, config);
- }
-
- private void connect(Integer headOfChain, StreamEdge edge) {
-
- physicalEdgesInOrder.add(edge);
-
- Integer downStreamvertexID = edge.getTargetId();
-
- JobVertex headVertex = jobVertices.get(headOfChain);
- JobVertex downStreamVertex = jobVertices.get(downStreamvertexID);
-
- StreamConfig downStreamConfig = new StreamConfig(downStreamVertex.getConfiguration());
-
- downStreamConfig.setNumberOfInputs(downStreamConfig.getNumberOfInputs() + 1);
-
- StreamPartitioner<?> partitioner = edge.getPartitioner();
- if (partitioner instanceof ForwardPartitioner) {
- downStreamVertex.connectNewDataSetAsInput(headVertex, DistributionPattern.POINTWISE);
- } else {
- downStreamVertex.connectNewDataSetAsInput(headVertex, DistributionPattern.ALL_TO_ALL);
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("CONNECTED: {} - {} -> {}", partitioner.getClass().getSimpleName(),
- headOfChain, downStreamvertexID);
- }
- }
-
- private boolean isChainable(StreamEdge edge) {
- StreamNode upStreamVertex = edge.getSourceVertex();
- StreamNode downStreamVertex = edge.getTargetVertex();
-
- StreamOperator<?> headOperator = upStreamVertex.getOperator();
- StreamOperator<?> outOperator = downStreamVertex.getOperator();
-
- return downStreamVertex.getInEdges().size() == 1
- && outOperator != null
- && headOperator != null
- && upStreamVertex.getSlotSharingID() == downStreamVertex.getSlotSharingID()
- && upStreamVertex.getSlotSharingID() != -1
- && (outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS ||
- outOperator.getChainingStrategy() == ChainingStrategy.FORCE_ALWAYS)
- && (headOperator.getChainingStrategy() == ChainingStrategy.HEAD ||
- headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS ||
- headOperator.getChainingStrategy() == ChainingStrategy.FORCE_ALWAYS)
- && (edge.getPartitioner() instanceof ForwardPartitioner || downStreamVertex
- .getParallelism() == 1)
- && upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
- && (streamGraph.isChainingEnabled() ||
- outOperator.getChainingStrategy() == ChainingStrategy.FORCE_ALWAYS);
- }
-
- private void setSlotSharing() {
-
- Map<Integer, SlotSharingGroup> slotSharingGroups = new HashMap<Integer, SlotSharingGroup>();
-
- for (Entry<Integer, JobVertex> entry : jobVertices.entrySet()) {
-
- int slotSharingID = streamGraph.getStreamNode(entry.getKey()).getSlotSharingID();
-
- if (slotSharingID != -1) {
- SlotSharingGroup group = slotSharingGroups.get(slotSharingID);
- if (group == null) {
- group = new SlotSharingGroup();
- slotSharingGroups.put(slotSharingID, group);
- }
- entry.getValue().setSlotSharingGroup(group);
- }
- }
-
- for (Tuple2<StreamNode, StreamNode> pair : streamGraph.getIterationSourceSinkPairs()) {
-
- CoLocationGroup ccg = new CoLocationGroup();
-
- JobVertex source = jobVertices.get(pair.f0.getId());
- JobVertex sink = jobVertices.get(pair.f1.getId());
-
- ccg.addVertex(source);
- ccg.addVertex(sink);
- source.updateCoLocationGroup(ccg);
- sink.updateCoLocationGroup(ccg);
- }
-
- }
-
- private void configureCheckpointing() {
- if (streamGraph.isCheckpointingEnabled()) {
- long interval = streamGraph.getCheckpointingInterval();
- if (interval < 1) {
- throw new IllegalArgumentException("The checkpoint interval must be positive");
- }
-
- // collect the vertices that receive "trigger checkpoint" messages.
- // currently, these are all the sources
- List<JobVertexID> triggerVertices = new ArrayList<JobVertexID>();
-
- // collect the vertices that need to acknowledge the checkpoint
- // currently, these are all vertices
- List<JobVertexID> ackVertices = new ArrayList<JobVertexID>(jobVertices.size());
-
- // collect the vertices that receive "commit checkpoint" messages
- // currently, these are all certices
- List<JobVertexID> commitVertices = new ArrayList<JobVertexID>();
-
-
- for (JobVertex vertex : jobVertices.values()) {
- if (vertex.isInputVertex()) {
- triggerVertices.add(vertex.getID());
- }
- // TODO: add check whether the user function implements the checkpointing interface
- commitVertices.add(vertex.getID());
- ackVertices.add(vertex.getID());
- }
-
- JobSnapshottingSettings settings = new JobSnapshottingSettings(
- triggerVertices, ackVertices, commitVertices, interval);
- jobGraph.setSnapshotSettings(settings);
-
- // if the user enabled checkpointing, the default number of exec retries is infinitive.
- int executionRetries = streamGraph.getExecutionConfig().getNumberOfExecutionRetries();
- if(executionRetries == -1) {
- streamGraph.getExecutionConfig().setNumberOfExecutionRetries(Integer.MAX_VALUE);
- }
- long executionRetryDelay = streamGraph.getExecutionConfig().getExecutionRetryDelay();
- if(executionRetryDelay == -1) {
- streamGraph.getExecutionConfig().setExecutionRetryDelay(100 * 1000);
- }
- }
- }
-
- private void configureExecutionRetries() {
- int executionRetries = streamGraph.getExecutionConfig().getNumberOfExecutionRetries();
- if (executionRetries != -1) {
- jobGraph.setNumberOfExecutionRetries(executionRetries);
- } else {
- // if the user didn't configure anything, the number of retries is 0.
- jobGraph.setNumberOfExecutionRetries(0);
- }
- }
-
- private void configureExecutionRetryDelay() {
- long executionRetryDelay = streamGraph.getExecutionConfig().getExecutionRetryDelay();
- if (executionRetryDelay != -1) {
- jobGraph.setExecutionRetryDelay(executionRetryDelay);
- } else {
- jobGraph.setExecutionRetryDelay(100 * 1000);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
deleted file mode 100644
index 078679d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ /dev/null
@@ -1,373 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.runtime.state.KvState;
-import org.apache.flink.runtime.state.KvStateSnapshot;
-import org.apache.flink.runtime.state.StateBackend;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Base class for all stream operators. Operators that contain a user function should extend the class
- * {@link AbstractUdfStreamOperator} instead (which is a specialized subclass of this class).
- *
- * <p>For concrete implementations, one of the following two interfaces must also be implemented, to
- * mark the operator as unary or binary:
- * {@link org.apache.flink.streaming.api.operators.OneInputStreamOperator} or
- * {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator}.
- *
- * <p>Methods of {@code StreamOperator} are guaranteed not to be called concurrently. Also, if using
- * the timer service, timer callbacks are also guaranteed not to be called concurrently with
- * methods on {@code StreamOperator}.
- *
- * @param <OUT> The output type of the operator
- */
-public abstract class AbstractStreamOperator<OUT>
- implements StreamOperator<OUT>, java.io.Serializable {
-
- private static final long serialVersionUID = 1L;
-
- /** The logger used by the operator class and its subclasses */
- protected static final Logger LOG = LoggerFactory.getLogger(AbstractStreamOperator.class);
-
- // ----------- configuration properties -------------
-
- // A sane default for most operators
- protected ChainingStrategy chainingStrategy = ChainingStrategy.HEAD;
-
- private boolean inputCopyDisabled = false;
-
- // ---------------- runtime fields ------------------
-
- /** The task that contains this operator (and other operators in the same chain) */
- private transient StreamTask<?, ?> container;
-
- private transient StreamConfig config;
-
- protected transient Output<StreamRecord<OUT>> output;
-
- /** The runtime context for UDFs */
- private transient StreamingRuntimeContext runtimeContext;
-
-
- // ---------------- key/value state ------------------
-
- /** key selector used to get the key for the state. Non-null only is the operator uses key/value state */
- private transient KeySelector<?, ?> stateKeySelector;
-
- private transient KvState<?, ?, ?>[] keyValueStates;
-
- private transient HashMap<String, KvState<?, ?, ?>> keyValueStatesByName;
-
- private transient TypeSerializer<?> keySerializer;
-
- private transient HashMap<String, KvStateSnapshot<?, ?, ?>> keyValueStateSnapshots;
-
- // ------------------------------------------------------------------------
- // Life Cycle
- // ------------------------------------------------------------------------
-
- @Override
- public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
- this.container = containingTask;
- this.config = config;
- this.output = output;
- this.runtimeContext = new StreamingRuntimeContext(this, container.getEnvironment(), container.getAccumulatorMap());
- }
-
- /**
- * This method is called immediately before any elements are processed, it should contain the
- * operator's initialization logic.
- *
- * <p>The default implementation does nothing.
- *
- * @throws Exception An exception in this method causes the operator to fail.
- */
- @Override
- public void open() throws Exception {}
-
- /**
- * This method is called after all records have been added to the operators via the methods
- * {@link org.apache.flink.streaming.api.operators.OneInputStreamOperator#processElement(StreamRecord)}, or
- * {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator#processElement1(StreamRecord)} and
- * {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator#processElement2(StreamRecord)}.
-
- * <p>The method is expected to flush all remaining buffered data. Exceptions during this flushing
- * of buffered should be propagated, in order to cause the operation to be recognized asa failed,
- * because the last data items are not processed properly.
- *
- * @throws Exception An exception in this method causes the operator to fail.
- */
- @Override
- public void close() throws Exception {}
-
- /**
- * This method is called at the very end of the operator's life, both in the case of a successful
- * completion of the operation, and in the case of a failure and canceling.
- *
- * This method is expected to make a thorough effort to release all resources
- * that the operator has acquired.
- */
- @Override
- public void dispose() {
- if (keyValueStates != null) {
- for (KvState<?, ?, ?> state : keyValueStates) {
- state.dispose();
- }
- }
- }
-
- // ------------------------------------------------------------------------
- // Checkpointing
- // ------------------------------------------------------------------------
-
- @Override
- public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
- // here, we deal with key/value state snapshots
-
- StreamTaskState state = new StreamTaskState();
- if (keyValueStates != null) {
- HashMap<String, KvStateSnapshot<?, ?, ?>> snapshots = new HashMap<>(keyValueStatesByName.size());
-
- for (Map.Entry<String, KvState<?, ?, ?>> entry : keyValueStatesByName.entrySet()) {
- KvStateSnapshot<?, ?, ?> snapshot = entry.getValue().shapshot(checkpointId, timestamp);
- snapshots.put(entry.getKey(), snapshot);
- }
-
- state.setKvStates(snapshots);
- }
-
- return state;
- }
-
- @Override
- public void restoreState(StreamTaskState state) throws Exception {
- // restore the key/value state. the actual restore happens lazily, when the function requests
- // the state again, because the restore method needs information provided by the user function
- keyValueStateSnapshots = state.getKvStates();
- }
-
- @Override
- public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception {
- // by default, nothing needs a notification of checkpoint completion
- }
-
- // ------------------------------------------------------------------------
- // Properties and Services
- // ------------------------------------------------------------------------
-
- /**
- * Gets the execution config defined on the execution environment of the job to which this
- * operator belongs.
- *
- * @return The job's execution config.
- */
- public ExecutionConfig getExecutionConfig() {
- return container.getExecutionConfig();
- }
-
- public StreamConfig getOperatorConfig() {
- return config;
- }
-
- public StreamTask<?, ?> getContainingTask() {
- return container;
- }
-
- public ClassLoader getUserCodeClassloader() {
- return container.getUserCodeClassLoader();
- }
-
- /**
- * Returns a context that allows the operator to query information about the execution and also
- * to interact with systems such as broadcast variables and managed state. This also allows
- * to register timers.
- */
- public StreamingRuntimeContext getRuntimeContext() {
- return runtimeContext;
- }
-
- public StateBackend<?> getStateBackend() {
- return container.getStateBackend();
- }
-
- /**
- * Register a timer callback. At the specified time the {@link Triggerable} will be invoked.
- * This call is guaranteed to not happen concurrently with method calls on the operator.
- *
- * @param time The absolute time in milliseconds.
- * @param target The target to be triggered.
- */
- protected void registerTimer(long time, Triggerable target) {
- container.registerTimer(time, target);
- }
-
- /**
- * Creates a key/value state handle, using the state backend configured for this task.
- *
- * @param stateType The type information for the state type, used for managed memory and state snapshots.
- * @param defaultValue The default value that the state should return for keys that currently have
- * no value associated with them
- *
- * @param <V> The type of the state value.
- *
- * @return The key/value state for this operator.
- *
- * @throws IllegalStateException Thrown, if the key/value state was already initialized.
- * @throws Exception Thrown, if the state backend cannot create the key/value state.
- */
- protected <V> OperatorState<V> createKeyValueState(
- String name, TypeInformation<V> stateType, V defaultValue) throws Exception
- {
- return createKeyValueState(name, stateType.createSerializer(getExecutionConfig()), defaultValue);
- }
-
- /**
- * Creates a key/value state handle, using the state backend configured for this task.
- *
- * @param valueSerializer The type serializer for the state type, used for managed memory and state snapshots.
- * @param defaultValue The default value that the state should return for keys that currently have
- * no value associated with them
- *
- * @param <K> The type of the state key.
- * @param <V> The type of the state value.
- * @param <Backend> The type of the state backend that creates the key/value state.
- *
- * @return The key/value state for this operator.
- *
- * @throws IllegalStateException Thrown, if the key/value state was already initialized.
- * @throws Exception Thrown, if the state backend cannot create the key/value state.
- */
- @SuppressWarnings({"rawtypes", "unchecked"})
- protected <K, V, Backend extends StateBackend<Backend>> OperatorState<V> createKeyValueState(
- String name, TypeSerializer<V> valueSerializer, V defaultValue) throws Exception
- {
- if (name == null || name.isEmpty()) {
- throw new IllegalArgumentException();
- }
- if (keyValueStatesByName != null && keyValueStatesByName.containsKey(name)) {
- throw new IllegalStateException("The key/value state has already been created");
- }
-
- TypeSerializer<K> keySerializer;
-
- // first time state access, make sure we load the state partitioner
- if (stateKeySelector == null) {
- stateKeySelector = config.getStatePartitioner(getUserCodeClassloader());
- if (stateKeySelector == null) {
- throw new UnsupportedOperationException("The function or operator is not executed " +
- "on a KeyedStream and can hence not access the key/value state");
- }
-
- keySerializer = config.getStateKeySerializer(getUserCodeClassloader());
- if (keySerializer == null) {
- throw new Exception("State key serializer has not been configured in the config.");
- }
- this.keySerializer = keySerializer;
- }
- else if (this.keySerializer != null) {
- keySerializer = (TypeSerializer<K>) this.keySerializer;
- }
- else {
- // should never happen, this is merely a safeguard
- throw new RuntimeException();
- }
-
- @SuppressWarnings("unchecked")
- Backend stateBackend = (Backend) container.getStateBackend();
-
- KvState<K, V, Backend> kvstate = null;
-
- // check whether we restore the key/value state from a snapshot, or create a new blank one
- if (keyValueStateSnapshots != null) {
- @SuppressWarnings("unchecked")
- KvStateSnapshot<K, V, Backend> snapshot = (KvStateSnapshot<K, V, Backend>) keyValueStateSnapshots.remove(name);
-
- if (snapshot != null) {
- kvstate = snapshot.restoreState(
- stateBackend, keySerializer, valueSerializer, defaultValue, getUserCodeClassloader());
- }
- }
-
- if (kvstate == null) {
- // create a new blank key/value state
- kvstate = stateBackend.createKvState(keySerializer, valueSerializer, defaultValue);
- }
-
- if (keyValueStatesByName == null) {
- keyValueStatesByName = new HashMap<>();
- }
- keyValueStatesByName.put(name, kvstate);
- keyValueStates = keyValueStatesByName.values().toArray(new KvState[keyValueStatesByName.size()]);
- return kvstate;
- }
-
- @Override
- @SuppressWarnings({"unchecked", "rawtypes"})
- public void setKeyContextElement(StreamRecord record) throws Exception {
- if (stateKeySelector != null && keyValueStates != null) {
- KeySelector selector = stateKeySelector;
- for (KvState kv : keyValueStates) {
- kv.setCurrentKey(selector.getKey(record.getValue()));
- }
- }
- }
-
- // ------------------------------------------------------------------------
- // Context and chaining properties
- // ------------------------------------------------------------------------
-
- @Override
- public final void setChainingStrategy(ChainingStrategy strategy) {
- this.chainingStrategy = strategy;
- }
-
- @Override
- public final ChainingStrategy getChainingStrategy() {
- return chainingStrategy;
- }
-
- @Override
- public boolean isInputCopyingDisabled() {
- return inputCopyDisabled;
- }
-
- /**
- * Enable object-reuse for this operator instance. This overrides the setting in
- * the {@link org.apache.flink.api.common.ExecutionConfig}
- */
- public void disableInputCopy() {
- this.inputCopyDisabled = true;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
deleted file mode 100644
index 17bd08d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.util.FunctionUtils;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.runtime.state.StateBackend;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * This is used as the base class for operators that have a user-defined
- * function. This class handles the opening and closing of the user-defined functions,
- * as part of the operator life cycle.
- *
- * @param <OUT>
- * The output type of the operator
- * @param <F>
- * The type of the user function
- */
-public abstract class AbstractUdfStreamOperator<OUT, F extends Function> extends AbstractStreamOperator<OUT> {
-
- private static final long serialVersionUID = 1L;
-
-
- /** the user function */
- protected final F userFunction;
-
- /** Flag to prevent duplicate function.close() calls in close() and dispose() */
- private transient boolean functionsClosed = false;
-
-
- public AbstractUdfStreamOperator(F userFunction) {
- this.userFunction = requireNonNull(userFunction);
- }
-
- /**
- * Gets the user function executed in this operator.
- * @return The user function of this operator.
- */
- public F getUserFunction() {
- return userFunction;
- }
-
- // ------------------------------------------------------------------------
- // operator life cycle
- // ------------------------------------------------------------------------
-
-
- @Override
- public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
- super.setup(containingTask, config, output);
-
- FunctionUtils.setFunctionRuntimeContext(userFunction, getRuntimeContext());
- }
-
- @Override
- public void open() throws Exception {
- super.open();
-
- FunctionUtils.openFunction(userFunction, new Configuration());
- }
-
- @Override
- public void close() throws Exception {
- super.close();
- functionsClosed = true;
- FunctionUtils.closeFunction(userFunction);
- }
-
- @Override
- public void dispose() {
- if (!functionsClosed) {
- functionsClosed = true;
- try {
- FunctionUtils.closeFunction(userFunction);
- }
- catch (Throwable t) {
- LOG.error("Exception while closing user function while failing or canceling task", t);
- }
- }
- }
-
- // ------------------------------------------------------------------------
- // checkpointing and recovery
- // ------------------------------------------------------------------------
-
- @Override
- public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
- StreamTaskState state = super.snapshotOperatorState(checkpointId, timestamp);
-
- if (userFunction instanceof Checkpointed) {
- @SuppressWarnings("unchecked")
- Checkpointed<Serializable> chkFunction = (Checkpointed<Serializable>) userFunction;
-
- Serializable udfState;
- try {
- udfState = chkFunction.snapshotState(checkpointId, timestamp);
- }
- catch (Exception e) {
- throw new Exception("Failed to draw state snapshot from function: " + e.getMessage(), e);
- }
-
- if (udfState != null) {
- try {
- StateBackend<?> stateBackend = getStateBackend();
- StateHandle<Serializable> handle =
- stateBackend.checkpointStateSerializable(udfState, checkpointId, timestamp);
- state.setFunctionState(handle);
- }
- catch (Exception e) {
- throw new Exception("Failed to add the state snapshot of the function to the checkpoint: "
- + e.getMessage(), e);
- }
- }
- }
-
- return state;
- }
-
- @Override
- public void restoreState(StreamTaskState state) throws Exception {
- super.restoreState(state);
-
- StateHandle<Serializable> stateHandle = state.getFunctionState();
-
- if (userFunction instanceof Checkpointed && stateHandle != null) {
- @SuppressWarnings("unchecked")
- Checkpointed<Serializable> chkFunction = (Checkpointed<Serializable>) userFunction;
-
- Serializable functionState = stateHandle.getState(getUserCodeClassloader());
- if (functionState != null) {
- try {
- chkFunction.restoreState(functionState);
- }
- catch (Exception e) {
- throw new Exception("Failed to restore state to function: " + e.getMessage(), e);
- }
- }
- }
- }
-
- @Override
- public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception {
- super.notifyOfCompletedCheckpoint(checkpointId);
-
- if (userFunction instanceof CheckpointNotifier) {
- ((CheckpointNotifier) userFunction).notifyCheckpointComplete(checkpointId);
- }
- }
-
- // ------------------------------------------------------------------------
- // Utilities
- // ------------------------------------------------------------------------
-
- /**
- *
- * Since the streaming API does not implement any parametrization of functions via a
- * configuration, the config returned here is actually empty.
- *
- * @return The user function parameters (currently empty)
- */
- public Configuration getUserFunctionParameters() {
- return new Configuration();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/ChainingStrategy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/ChainingStrategy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/ChainingStrategy.java
deleted file mode 100644
index 3a752b0..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/ChainingStrategy.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-/**
- * Defines the chaining scheme for the operator.
- * By default {@link #ALWAYS} is used, which means operators will be eagerly chained whenever possible.
- */
-public enum ChainingStrategy {
-
- /**
- * Chaining will happen even if chaining is disabled on the execution environment.
- * This should only be used by system-level operators, not operators implemented by users.
- */
- FORCE_ALWAYS,
-
- /**
- * Operators will be eagerly chained whenever possible, for
- * maximal performance. It is generally a good practice to allow maximal
- * chaining and increase operator parallelism
- */
- ALWAYS,
-
- /**
- * The operator will not be chained to the preceding or succeeding operators.
- */
- NEVER,
-
-
- HEAD
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/OneInputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/OneInputStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/OneInputStreamOperator.java
deleted file mode 100644
index 705c1b3..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/OneInputStreamOperator.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-/**
- * Interface for stream operators with one input. Use
- * {@link org.apache.flink.streaming.api.operators.AbstractStreamOperator} as a base class if
- * you want to implement a custom operator.
- *
- * @param <IN> The input type of the operator
- * @param <OUT> The output type of the operator
- */
-public interface OneInputStreamOperator<IN, OUT> extends StreamOperator<OUT> {
-
- /**
- * Processes one element that arrived at this operator.
- * This method is guaranteed to not be called concurrently with other methods of the operator.
- */
- void processElement(StreamRecord<IN> element) throws Exception;
-
- /**
- * Processes a {@link Watermark}.
- * This method is guaranteed to not be called concurrently with other methods of the operator.
- *
- * @see org.apache.flink.streaming.api.watermark.Watermark
- */
- void processWatermark(Watermark mark) throws Exception;
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/Output.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/Output.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/Output.java
deleted file mode 100644
index 0cbc954..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/Output.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.operators;
-
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.util.Collector;
-
-/**
- * A {@link org.apache.flink.streaming.api.operators.StreamOperator} is supplied with an object
- * of this interface that can be used to emit elements and other messages, such as barriers
- * and watermarks, from an operator.
- *
- * @param <T> The type of the elements that can be emitted.
- */
-public interface Output<T> extends Collector<T> {
-
- /**
- * Emits a {@link Watermark} from an operator. This watermark is broadcast to all downstream
- * operators.
- *
- * <p>A watermark specifies that no element with a timestamp older or equal to the watermark
- * timestamp will be emitted in the future.
- */
- void emitWatermark(Watermark mark);
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java
deleted file mode 100644
index 1d05966..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-
-/**
- * Stream operators can implement this interface if they need access to the output type information
- * at {@link org.apache.flink.streaming.api.graph.StreamGraph} generation. This can be useful for
- * cases where the output type is specified by the returns method and, thus, after the stream
- * operator has been created.
- */
-public interface OutputTypeConfigurable<OUT> {
-
- /**
- * Is called by the {@link org.apache.flink.streaming.api.graph.StreamGraph#addOperator(Integer, StreamOperator, TypeInformation, TypeInformation, String)}
- * method when the {@link org.apache.flink.streaming.api.graph.StreamGraph} is generated. The
- * method is called with the output {@link TypeInformation} which is also used for the
- * {@link org.apache.flink.streaming.runtime.tasks.StreamTask} output serializer.
- *
- * @param outTypeInfo Output type information of the {@link org.apache.flink.streaming.runtime.tasks.StreamTask}
- * @param executionConfig Execution configuration
- */
- void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig executionConfig);
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamCounter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamCounter.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamCounter.java
deleted file mode 100644
index efe5d52..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamCounter.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-public class StreamCounter<IN> extends AbstractStreamOperator<Long> implements OneInputStreamOperator<IN, Long> {
-
- private static final long serialVersionUID = 1L;
-
- private Long count = 0L;
-
- public StreamCounter() {
- chainingStrategy = ChainingStrategy.ALWAYS;
- }
-
- @Override
- public void processElement(StreamRecord<IN> element) {
- output.collect(element.replace(++count));
- }
-
- @Override
- public void processWatermark(Watermark mark) throws Exception {
- output.emitWatermark(mark);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFilter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFilter.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFilter.java
deleted file mode 100644
index 2ff220e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFilter.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-public class StreamFilter<IN> extends AbstractUdfStreamOperator<IN, FilterFunction<IN>> implements OneInputStreamOperator<IN, IN> {
-
- private static final long serialVersionUID = 1L;
-
- public StreamFilter(FilterFunction<IN> filterFunction) {
- super(filterFunction);
- chainingStrategy = ChainingStrategy.ALWAYS;
- }
-
- @Override
- public void processElement(StreamRecord<IN> element) throws Exception {
- if (userFunction.filter(element.getValue())) {
- output.collect(element);
- }
- }
-
- @Override
- public void processWatermark(Watermark mark) throws Exception {
- output.emitWatermark(mark);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java
deleted file mode 100644
index 23b638e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-public class StreamFlatMap<IN, OUT>
- extends AbstractUdfStreamOperator<OUT, FlatMapFunction<IN, OUT>>
- implements OneInputStreamOperator<IN, OUT> {
-
- private static final long serialVersionUID = 1L;
-
- private transient TimestampedCollector<OUT> collector;
-
- public StreamFlatMap(FlatMapFunction<IN, OUT> flatMapper) {
- super(flatMapper);
- chainingStrategy = ChainingStrategy.ALWAYS;
- }
-
- @Override
- public void open() throws Exception {
- super.open();
- collector = new TimestampedCollector<OUT>(output);
- }
-
- @Override
- public void processElement(StreamRecord<IN> element) throws Exception {
- collector.setTimestamp(element.getTimestamp());
- userFunction.flatMap(element.getValue(), collector);
- }
-
- @Override
- public void processWatermark(Watermark mark) throws Exception {
- output.emitWatermark(mark);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
deleted file mode 100644
index cf6b489..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
-import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-public class StreamGroupedFold<IN, OUT, KEY>
- extends AbstractUdfStreamOperator<OUT, FoldFunction<IN, OUT>>
- implements OneInputStreamOperator<IN, OUT>, OutputTypeConfigurable<OUT> {
-
- private static final long serialVersionUID = 1L;
-
- private static final String STATE_NAME = "_op_state";
-
- // Grouped values
- private transient OperatorState<OUT> values;
-
- private transient OUT initialValue;
-
- // Initial value serialization
- private byte[] serializedInitialValue;
-
- private TypeSerializer<OUT> outTypeSerializer;
-
- public StreamGroupedFold(FoldFunction<IN, OUT> folder, OUT initialValue) {
- super(folder);
- this.initialValue = initialValue;
- }
-
- @Override
- public void open() throws Exception {
- super.open();
-
- if (serializedInitialValue == null) {
- throw new RuntimeException("No initial value was serialized for the fold " +
- "operator. Probably the setOutputType method was not called.");
- }
-
- ByteArrayInputStream bais = new ByteArrayInputStream(serializedInitialValue);
- InputViewDataInputStreamWrapper in = new InputViewDataInputStreamWrapper(
- new DataInputStream(bais)
- );
- initialValue = outTypeSerializer.deserialize(in);
- values = createKeyValueState(STATE_NAME, outTypeSerializer, null);
- }
-
- @Override
- public void processElement(StreamRecord<IN> element) throws Exception {
- OUT value = values.value();
-
- if (value != null) {
- OUT folded = userFunction.fold(outTypeSerializer.copy(value), element.getValue());
- values.update(folded);
- output.collect(element.replace(folded));
- } else {
- OUT first = userFunction.fold(outTypeSerializer.copy(initialValue), element.getValue());
- values.update(first);
- output.collect(element.replace(first));
- }
- }
-
- @Override
- public void processWatermark(Watermark mark) throws Exception {
- output.emitWatermark(mark);
- }
-
- @Override
- public void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig executionConfig) {
- outTypeSerializer = outTypeInfo.createSerializer(executionConfig);
-
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- OutputViewDataOutputStreamWrapper out = new OutputViewDataOutputStreamWrapper(
- new DataOutputStream(baos)
- );
-
- try {
- outTypeSerializer.serialize(initialValue, out);
- } catch (IOException ioe) {
- throw new RuntimeException("Unable to serialize initial value of type " +
- initialValue.getClass().getSimpleName() + " of fold operator.", ioe);
- }
-
- serializedInitialValue = baos.toByteArray();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
deleted file mode 100644
index ae15e92..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-public class StreamGroupedReduce<IN> extends AbstractUdfStreamOperator<IN, ReduceFunction<IN>>
- implements OneInputStreamOperator<IN, IN> {
-
- private static final long serialVersionUID = 1L;
-
- private static final String STATE_NAME = "_op_state";
-
- private transient OperatorState<IN> values;
-
- private TypeSerializer<IN> serializer;
-
-
- public StreamGroupedReduce(ReduceFunction<IN> reducer, TypeSerializer<IN> serializer) {
- super(reducer);
- this.serializer = serializer;
- }
-
- @Override
- public void open() throws Exception {
- super.open();
- values = createKeyValueState(STATE_NAME, serializer, null);
- }
-
- @Override
- public void processElement(StreamRecord<IN> element) throws Exception {
- IN value = element.getValue();
- IN currentValue = values.value();
-
- if (currentValue != null) {
- IN reduced = userFunction.reduce(currentValue, value);
- values.update(reduced);
- output.collect(element.replace(reduced));
- } else {
- values.update(value);
- output.collect(element.replace(value));
- }
- }
-
- @Override
- public void processWatermark(Watermark mark) throws Exception {
- output.emitWatermark(mark);
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamMap.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamMap.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamMap.java
deleted file mode 100644
index 7d5c7cc..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamMap.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-public class StreamMap<IN, OUT>
- extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>
- implements OneInputStreamOperator<IN, OUT> {
-
- private static final long serialVersionUID = 1L;
-
- public StreamMap(MapFunction<IN, OUT> mapper) {
- super(mapper);
- chainingStrategy = ChainingStrategy.ALWAYS;
- }
-
- @Override
- public void processElement(StreamRecord<IN> element) throws Exception {
- output.collect(element.replace(userFunction.map(element.getValue())));
- }
-
- @Override
- public void processWatermark(Watermark mark) throws Exception {
- output.emitWatermark(mark);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
deleted file mode 100644
index fac26f1..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import java.io.Serializable;
-
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
-
-/**
- * Basic interface for stream operators. Implementers would implement one of
- * {@link org.apache.flink.streaming.api.operators.OneInputStreamOperator} or
- * {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator} to create operators
- * that process elements.
- *
- * <p> The class {@link org.apache.flink.streaming.api.operators.AbstractStreamOperator}
- * offers default implementation for the lifecycle and properties methods.
- *
- * <p> Methods of {@code StreamOperator} are guaranteed not to be called concurrently. Also, if using
- * the timer service, timer callbacks are also guaranteed not to be called concurrently with
- * methods on {@code StreamOperator}.
- *
- * @param <OUT> The output type of the operator
- */
-public interface StreamOperator<OUT> extends Serializable {
-
- // ------------------------------------------------------------------------
- // life cycle
- // ------------------------------------------------------------------------
-
- /**
- * Initializes the operator. Sets access to the context and the output.
- */
- void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output);
-
- /**
- * This method is called immediately before any elements are processed, it should contain the
- * operator's initialization logic.
- *
- * @throws java.lang.Exception An exception in this method causes the operator to fail.
- */
- void open() throws Exception;
-
- /**
- * This method is called after all records have been added to the operators via the methods
- * {@link org.apache.flink.streaming.api.operators.OneInputStreamOperator#processElement(StreamRecord)}, or
- * {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator#processElement1(StreamRecord)} and
- * {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator#processElement2(StreamRecord)}.
-
- * <p>
- * The method is expected to flush all remaining buffered data. Exceptions during this flushing
- * of buffered should be propagated, in order to cause the operation to be recognized asa failed,
- * because the last data items are not processed properly.
- *
- * @throws java.lang.Exception An exception in this method causes the operator to fail.
- */
- void close() throws Exception;
-
- /**
- * This method is called at the very end of the operator's life, both in the case of a successful
- * completion of the operation, and in the case of a failure and canceling.
- *
- * This method is expected to make a thorough effort to release all resources
- * that the operator has acquired.
- */
- void dispose();
-
- // ------------------------------------------------------------------------
- // state snapshots
- // ------------------------------------------------------------------------
-
- /**
- * Called to draw a state snapshot from the operator. This method snapshots the operator state
- * (if the operator is stateful) and the key/value state (if it is being used and has been
- * initialized).
- *
- * @param checkpointId The ID of the checkpoint.
- * @param timestamp The timestamp of the checkpoint.
- *
- * @return The StreamTaskState object, possibly containing the snapshots for the
- * operator and key/value state.
- *
- * @throws Exception Forwards exceptions that occur while drawing snapshots from the operator
- * and the key/value state.
- */
- StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception;
-
- /**
- * Restores the operator state, if this operator's execution is recovering from a checkpoint.
- * This method restores the operator state (if the operator is stateful) and the key/value state
- * (if it had been used and was initialized when the snapshot ocurred).
- *
- * <p>This method is called after {@link #setup(StreamTask, StreamConfig, Output)}
- * and before {@link #open()}.
- *
- * @param state The state of operator that was snapshotted as part of checkpoint
- * from which the execution is restored.
- *
- * @throws Exception Exceptions during state restore should be forwarded, so that the system can
- * properly react to failed state restore and fail the execution attempt.
- */
- void restoreState(StreamTaskState state) throws Exception;
-
- /**
- * Called when the checkpoint with the given ID is completed and acknowledged on the JobManager.
- *
- * @param checkpointId The ID of the checkpoint that has been completed.
- *
- * @throws Exception Exceptions during checkpoint acknowledgement may be forwarded and will cause
- * the program to fail and enter recovery.
- */
- void notifyOfCompletedCheckpoint(long checkpointId) throws Exception;
-
- // ------------------------------------------------------------------------
- // miscellaneous
- // ------------------------------------------------------------------------
-
- void setKeyContextElement(StreamRecord<?> record) throws Exception;
-
- /**
- * An operator can return true here to disable copying of its input elements. This overrides
- * the object-reuse setting on the {@link org.apache.flink.api.common.ExecutionConfig}
- */
- boolean isInputCopyingDisabled();
-
- ChainingStrategy getChainingStrategy();
-
- void setChainingStrategy(ChainingStrategy strategy);
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamProject.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamProject.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamProject.java
deleted file mode 100644
index 1ce4ff6..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamProject.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-public class StreamProject<IN, OUT extends Tuple>
- extends AbstractStreamOperator<OUT>
- implements OneInputStreamOperator<IN, OUT> {
-
- private static final long serialVersionUID = 1L;
-
- private TypeSerializer<OUT> outSerializer;
- private int[] fields;
- private int numFields;
-
- private transient OUT outTuple;
-
- public StreamProject(int[] fields, TypeSerializer<OUT> outSerializer) {
- this.fields = fields;
- this.numFields = this.fields.length;
- this.outSerializer = outSerializer;
-
- chainingStrategy = ChainingStrategy.ALWAYS;
- }
-
-
- @Override
- public void processElement(StreamRecord<IN> element) throws Exception {
- for (int i = 0; i < this.numFields; i++) {
- outTuple.setField(((Tuple) element.getValue()).getField(fields[i]), i);
- }
- output.collect(element.replace(outTuple));
- }
-
- @Override
- public void open() throws Exception {
- super.open();
- outTuple = outSerializer.createInstance();
- }
-
- @Override
- public void processWatermark(Watermark mark) throws Exception {
- output.emitWatermark(mark);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
deleted file mode 100644
index 6961a4d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-public class StreamSink<IN> extends AbstractUdfStreamOperator<Object, SinkFunction<IN>>
- implements OneInputStreamOperator<IN, Object> {
-
- private static final long serialVersionUID = 1L;
-
- public StreamSink(SinkFunction<IN> sinkFunction) {
- super(sinkFunction);
-
- chainingStrategy = ChainingStrategy.ALWAYS;
- }
-
- @Override
- public void processElement(StreamRecord<IN> element) throws Exception {
- userFunction.invoke(element.getValue());
- }
-
- @Override
- public void processWatermark(Watermark mark) throws Exception {
- // ignore it for now, we are a sink, after all
- }
-}