You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:03:55 UTC
[39/51] [partial] flink git commit: [FLINK-2877] Move Streaming API
out of Staging package
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumFunction.java
deleted file mode 100644
index e5fa4c3..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumFunction.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.aggregation;
-
-import java.io.Serializable;
-
-public abstract class SumFunction implements Serializable{
-
- private static final long serialVersionUID = 1L;
-
- public abstract Object add(Object o1, Object o2);
-
- public static SumFunction getForClass(Class<?> clazz) {
-
- if (clazz == Integer.class) {
- return new IntSum();
- } else if (clazz == Long.class) {
- return new LongSum();
- } else if (clazz == Short.class) {
- return new ShortSum();
- } else if (clazz == Double.class) {
- return new DoubleSum();
- } else if (clazz == Float.class) {
- return new FloatSum();
- } else if (clazz == Byte.class) {
- return new ByteSum();
- } else {
- throw new RuntimeException("DataStream cannot be summed because the class "
- + clazz.getSimpleName() + " does not support the + operator.");
- }
- }
-
- public static class IntSum extends SumFunction {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Object add(Object value1, Object value2) {
- return (Integer) value1 + (Integer) value2;
- }
- }
-
- public static class LongSum extends SumFunction {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Object add(Object value1, Object value2) {
- return (Long) value1 + (Long) value2;
- }
- }
-
- public static class DoubleSum extends SumFunction {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public Object add(Object value1, Object value2) {
- return (Double) value1 + (Double) value2;
- }
- }
-
- public static class ShortSum extends SumFunction {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Object add(Object value1, Object value2) {
- return (short) ((Short) value1 + (Short) value2);
- }
- }
-
- public static class FloatSum extends SumFunction {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Object add(Object value1, Object value2) {
- return (Float) value1 + (Float) value2;
- }
- }
-
- public static class ByteSum extends SumFunction {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Object add(Object value1, Object value2) {
- return (byte) ((Byte) value1 + (Byte) value2);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CoFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CoFlatMapFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CoFlatMapFunction.java
deleted file mode 100644
index ae11cd9..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CoFlatMapFunction.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.co;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.util.Collector;
-
-/**
- * A CoFlatMapFunction represents a FlatMap transformation with two different
- * input types.
- *
- * @param <IN1>
- * Type of the first input.
- * @param <IN2>
- * Type of the second input.
- * @param <OUT>
- * Output type.
- */
-public interface CoFlatMapFunction<IN1, IN2, OUT> extends Function, Serializable {
-
- void flatMap1(IN1 value, Collector<OUT> out) throws Exception;
-
- void flatMap2(IN2 value, Collector<OUT> out) throws Exception;
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CoMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CoMapFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CoMapFunction.java
deleted file mode 100644
index a545282..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CoMapFunction.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.co;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.common.functions.Function;
-
-/**
- * A CoMapFunction represents a Map transformation with two different input
- * types.
- *
- * @param <IN1>
- * Type of the first input.
- * @param <IN2>
- * Type of the second input.
- * @param <OUT>
- * Output type.
- */
-public interface CoMapFunction<IN1, IN2, OUT> extends Function, Serializable {
-
- OUT map1(IN1 value) throws Exception;
-
- OUT map2(IN2 value) throws Exception;
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoFlatMapFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoFlatMapFunction.java
deleted file mode 100644
index 6746140..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoFlatMapFunction.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.co;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.functions.RichFunction;
-
-/**
- * A RichCoFlatMapFunction represents a FlatMap transformation with two different input
- * types. In addition to that the user can use the features provided by the
- * {@link RichFunction} interface.
- *
- * @param <IN1>
- * Type of the first input.
- * @param <IN2>
- * Type of the second input.
- * @param <OUT>
- * Output type.
- */
-public abstract class RichCoFlatMapFunction<IN1, IN2, OUT> extends AbstractRichFunction implements
- CoFlatMapFunction<IN1, IN2, OUT> {
-
- private static final long serialVersionUID = 1L;
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoMapFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoMapFunction.java
deleted file mode 100644
index e561408..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoMapFunction.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.co;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.functions.RichFunction;
-
-/**
- * A RichCoMapFunction represents a Map transformation with two different input
- * types. In addition to that the user can use the features provided by the
- * {@link RichFunction} interface.
- *
- * @param <IN1>
- * Type of the first input.
- * @param <IN2>
- * Type of the second input.
- * @param <OUT>
- * Output type.
- */
-public abstract class RichCoMapFunction<IN1, IN2, OUT> extends AbstractRichFunction implements
- CoMapFunction<IN1, IN2, OUT> {
-
- private static final long serialVersionUID = 1L;
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java
deleted file mode 100644
index 504bc39..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.sink;
-
-import java.io.IOException;
-import java.util.ArrayList;
-
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.io.CleanupWhenUnsuccessful;
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.configuration.Configuration;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Simple implementation of the SinkFunction writing tuples in the specified
- * OutputFormat format. Tuples are collected to a list and written to the file
- * periodically. The target path and the overwrite mode are pre-packaged in
- * format.
- *
- * @param <IN>
- * Input type
- */
-public abstract class FileSinkFunction<IN> extends RichSinkFunction<IN> {
-
- private static final long serialVersionUID = 1L;
-
- private static final Logger LOG = LoggerFactory.getLogger(FileSinkFunction.class);
-
- protected ArrayList<IN> tupleList = new ArrayList<IN>();
- protected volatile OutputFormat<IN> format;
- protected volatile boolean cleanupCalled = false;
- protected int indexInSubtaskGroup;
- protected int currentNumberOfSubtasks;
-
- public FileSinkFunction(OutputFormat<IN> format) {
- this.format = format;
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- RuntimeContext context = getRuntimeContext();
- format.configure(parameters);
- indexInSubtaskGroup = context.getIndexOfThisSubtask();
- currentNumberOfSubtasks = context.getNumberOfParallelSubtasks();
- format.open(indexInSubtaskGroup, currentNumberOfSubtasks);
- }
-
- @Override
- public void invoke(IN record) throws Exception {
- tupleList.add(record);
- if (updateCondition()) {
- flush();
- }
- }
-
- @Override
- public void close() throws IOException {
- if (!tupleList.isEmpty()) {
- flush();
- }
- try {
- format.close();
- } catch (Exception ex) {
- if (LOG.isErrorEnabled()) {
- LOG.error("Error while writing element.", ex);
- }
- try {
- if (!cleanupCalled && format instanceof CleanupWhenUnsuccessful) {
- cleanupCalled = true;
- ((CleanupWhenUnsuccessful) format).tryCleanupOnError();
- }
- } catch (Throwable t) {
- LOG.error("Cleanup on error failed.", t);
- }
- }
- }
-
- protected void flush() {
- try {
- for (IN rec : tupleList) {
- format.writeRecord(rec);
- }
- } catch (Exception ex) {
- try {
- if (LOG.isErrorEnabled()) {
- LOG.error("Error while writing element.", ex);
- }
- if (!cleanupCalled && format instanceof CleanupWhenUnsuccessful) {
- cleanupCalled = true;
- ((CleanupWhenUnsuccessful) format).tryCleanupOnError();
- }
- } catch (Throwable t) {
- LOG.error("Cleanup on error failed.", t);
- }
- throw new RuntimeException(ex);
- }
- resetParameters();
- }
-
- /**
- * Condition for writing the contents of tupleList and clearing it.
- *
- * @return value of the updating condition
- */
- protected abstract boolean updateCondition();
-
- /**
- * Statements to be executed after writing a batch goes here.
- */
- protected abstract void resetParameters();
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunctionByMillis.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunctionByMillis.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunctionByMillis.java
deleted file mode 100644
index 86bbb53..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunctionByMillis.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.sink;
-
-import org.apache.flink.api.common.io.OutputFormat;
-
-/**
- * Implementation of FileSinkFunction. Writes tuples to file in every millis
- * milliseconds.
- *
- * @param <IN>
- * Input type
- */
-public class FileSinkFunctionByMillis<IN> extends FileSinkFunction<IN> {
- private static final long serialVersionUID = 1L;
-
- private final long millis;
- private long lastTime;
-
- public FileSinkFunctionByMillis(OutputFormat<IN> format, long millis) {
- super(format);
- this.millis = millis;
- lastTime = System.currentTimeMillis();
- }
-
- /**
- * Condition for writing the contents of tupleList and clearing it.
- *
- * @return value of the updating condition
- */
- @Override
- protected boolean updateCondition() {
- return System.currentTimeMillis() - lastTime >= millis;
- }
-
- /**
- * Statements to be executed after writing a batch goes here.
- */
- @Override
- protected void resetParameters() {
- tupleList.clear();
- lastTime = System.currentTimeMillis();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java
deleted file mode 100644
index 93a91cd..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.sink;
-
-import java.io.PrintStream;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-
-/**
- * Implementation of the SinkFunction writing every tuple to the standard
- * output or standard error stream.
- *
- * @param <IN>
- * Input record type
- */
-public class PrintSinkFunction<IN> extends RichSinkFunction<IN> {
- private static final long serialVersionUID = 1L;
-
- private static final boolean STD_OUT = false;
- private static final boolean STD_ERR = true;
-
- private boolean target;
- private transient PrintStream stream;
- private transient String prefix;
-
- /**
- * Instantiates a print sink function that prints to standard out.
- */
- public PrintSinkFunction() {}
-
- /**
- * Instantiates a print sink function that prints to standard out.
- *
- * @param stdErr True, if the format should print to standard error instead of standard out.
- */
- public PrintSinkFunction(boolean stdErr) {
- target = stdErr;
- }
-
- public void setTargetToStandardOut() {
- target = STD_OUT;
- }
-
- public void setTargetToStandardErr() {
- target = STD_ERR;
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
- // get the target stream
- stream = target == STD_OUT ? System.out : System.err;
-
- // set the prefix if we have a >1 parallelism
- prefix = (context.getNumberOfParallelSubtasks() > 1) ?
- ((context.getIndexOfThisSubtask() + 1) + "> ") : null;
- }
-
- @Override
- public void invoke(IN record) {
- if (prefix != null) {
- stream.println(prefix + record.toString());
- }
- else {
- stream.println(record.toString());
- }
- }
-
- @Override
- public void close() {
- this.stream = null;
- this.prefix = null;
- }
-
- @Override
- public String toString() {
- return "Print to " + (target == STD_OUT ? "System.out" : "System.err");
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.java
deleted file mode 100644
index 7853758..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.sink;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-
-public abstract class RichSinkFunction<IN> extends AbstractRichFunction implements SinkFunction<IN> {
-
- private static final long serialVersionUID = 1L;
-
- public abstract void invoke(IN value) throws Exception;
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java
deleted file mode 100644
index 21308ed..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.sink;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.common.functions.Function;
-
-/**
- * Interface for implementing user defined sink functionality.
- *
- * @param <IN> Input type parameter.
- */
-public interface SinkFunction<IN> extends Function, Serializable {
-
- /**
- * Function for standard sink behaviour. This function is called for every record.
- *
- * @param value The input record.
- * @throws Exception
- */
- void invoke(IN value) throws Exception;
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
deleted file mode 100644
index 1356263..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
+++ /dev/null
@@ -1,268 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.sink;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.Socket;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.util.SerializableObject;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Socket client that acts as a streaming sink. The data is sent to a Socket as a byte array.
- * <p>
- * The sink can be set to retry message sends after the sending failed.
- * <p>
- * The sink can be set to 'autoflush', in which case the socket stream is flushed after every message. This
- * significantly reduced throughput, but also decreases message latency.
- *
- * @param <IN> data to be written into the Socket.
- */
-public class SocketClientSink<IN> extends RichSinkFunction<IN> {
-
- private static final long serialVersionUID = 1L;
-
- private static final Logger LOG = LoggerFactory.getLogger(SocketClientSink.class);
-
- private static final int CONNECTION_RETRY_DELAY = 500;
-
-
- private final SerializableObject lock = new SerializableObject();
- private final SerializationSchema<IN, byte[]> schema;
- private final String hostName;
- private final int port;
- private final int maxNumRetries;
- private final boolean autoFlush;
-
- private transient Socket client;
- private transient OutputStream outputStream;
-
- private int retries;
-
- private volatile boolean isRunning = true;
-
- /**
- * Creates a new SocketClientSink. The sink will not attempt to retry connections upon failure
- * and will not auto-flush the stream.
- *
- * @param hostName Hostname of the server to connect to.
- * @param port Port of the server.
- * @param schema Schema used to serialize the data into bytes.
- */
- public SocketClientSink(String hostName, int port, SerializationSchema<IN, byte[]> schema) {
- this(hostName, port, schema, 0);
- }
-
- /**
- * Creates a new SocketClientSink that retries connections upon failure up to a given number of times.
- * A value of -1 for the number of retries will cause the system to retry an infinite number of times.
- * The sink will not auto-flush the stream.
- *
- * @param hostName Hostname of the server to connect to.
- * @param port Port of the server.
- * @param schema Schema used to serialize the data into bytes.
- * @param maxNumRetries The maximum number of retries after a message send failed.
- */
- public SocketClientSink(String hostName, int port, SerializationSchema<IN, byte[]> schema, int maxNumRetries) {
- this(hostName, port, schema, maxNumRetries, false);
- }
-
- /**
- * Creates a new SocketClientSink that retries connections upon failure up to a given number of times.
- * A value of -1 for the number of retries will cause the system to retry an infinite number of times.
- *
- * @param hostName Hostname of the server to connect to.
- * @param port Port of the server.
- * @param schema Schema used to serialize the data into bytes.
- * @param maxNumRetries The maximum number of retries after a message send failed.
- * @param autoflush Flag to indicate whether the socket stream should be flushed after each message.
- */
- public SocketClientSink(String hostName, int port, SerializationSchema<IN, byte[]> schema,
- int maxNumRetries, boolean autoflush)
- {
- checkArgument(port > 0 && port < 65536, "port is out of range");
- checkArgument(maxNumRetries >= -1, "maxNumRetries must be zero or larger (num retries), or -1 (infinite retries)");
-
- this.hostName = checkNotNull(hostName, "hostname must not be null");
- this.port = port;
- this.schema = checkNotNull(schema);
- this.maxNumRetries = maxNumRetries;
- this.autoFlush = autoflush;
- }
-
- // ------------------------------------------------------------------------
- // Life cycle
- // ------------------------------------------------------------------------
-
- /**
- * Initialize the connection with the Socket in the server.
- * @param parameters Configuration.
- */
- @Override
- public void open(Configuration parameters) throws Exception {
- try {
- synchronized (lock) {
- createConnection();
- }
- }
- catch (IOException e) {
- throw new IOException("Cannot connect to socket server at " + hostName + ":" + port, e);
- }
- }
-
-
- /**
- * Called when new data arrives to the sink, and forwards it to Socket.
- *
- * @param value The value to write to the socket.
- */
- @Override
- public void invoke(IN value) throws Exception {
- byte[] msg = schema.serialize(value);
-
- try {
- outputStream.write(msg);
- if (autoFlush) {
- outputStream.flush();
- }
- }
- catch (IOException e) {
- // if no re-tries are enable, fail immediately
- if (maxNumRetries == 0) {
- throw new IOException("Failed to send message '" + value + "' to socket server at "
- + hostName + ":" + port + ". Connection re-tries are not enabled.", e);
- }
-
- LOG.error("Failed to send message '" + value + "' to socket server at " + hostName + ":" + port +
- ". Trying to reconnect..." , e);
-
- // do the retries in locked scope, to guard against concurrent close() calls
- // note that the first re-try comes immediately, without a wait!
-
- synchronized (lock) {
- IOException lastException = null;
- retries = 0;
-
- while (isRunning && (maxNumRetries < 0 || retries < maxNumRetries)) {
-
- // first, clean up the old resources
- try {
- if (outputStream != null) {
- outputStream.close();
- }
- }
- catch (IOException ee) {
- LOG.error("Could not close output stream from failed write attempt", ee);
- }
- try {
- if (client != null) {
- client.close();
- }
- }
- catch (IOException ee) {
- LOG.error("Could not close socket from failed write attempt", ee);
- }
-
- // try again
- retries++;
-
- try {
- // initialize a new connection
- createConnection();
-
- // re-try the write
- outputStream.write(msg);
-
- // success!
- return;
- }
- catch (IOException ee) {
- lastException = ee;
- LOG.error("Re-connect to socket server and send message failed. Retry time(s): " + retries, ee);
- }
-
- // wait before re-attempting to connect
- lock.wait(CONNECTION_RETRY_DELAY);
- }
-
- // throw an exception if the task is still running, otherwise simply leave the method
- if (isRunning) {
- throw new IOException("Failed to send message '" + value + "' to socket server at "
- + hostName + ":" + port + ". Failed after " + retries + " retries.", lastException);
- }
- }
- }
- }
-
- /**
- * Closes the connection with the Socket server.
- */
- @Override
- public void close() throws Exception {
- // flag this as not running any more
- isRunning = false;
-
- // clean up in locked scope, so there is no concurrent change to the stream and client
- synchronized (lock) {
- // we notify first (this statement cannot fail). The notified thread will not continue
- // anyways before it can re-acquire the lock
- lock.notifyAll();
-
- try {
- if (outputStream != null) {
- outputStream.close();
- }
- }
- finally {
- if (client != null) {
- client.close();
- }
- }
- }
- }
-
- // ------------------------------------------------------------------------
- // Utilities
- // ------------------------------------------------------------------------
-
- private void createConnection() throws IOException {
- client = new Socket(hostName, port);
- client.setKeepAlive(true);
- client.setTcpNoDelay(true);
-
- outputStream = client.getOutputStream();
- }
-
- // ------------------------------------------------------------------------
- // For testing
- // ------------------------------------------------------------------------
-
- int getCurrentNumberOfRetries() {
- synchronized (lock) {
- return retries;
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormat.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormat.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormat.java
deleted file mode 100644
index 019d35f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormat.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.sink;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-
-/**
- * Abstract class for formatting the output of the writeAsText and writeAsCsv
- * functions.
- *
- * @param <IN>
- * Input tuple type
- */
-public abstract class WriteFormat<IN> implements Serializable {
- private static final long serialVersionUID = 1L;
-
- /**
- * Writes the contents of tupleList to the file specified by path.
- *
- * @param path
- * is the path to the location where the tuples are written
- * @param tupleList
- * is the list of tuples to be written
- */
- protected abstract void write(String path, ArrayList<IN> tupleList);
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsCsv.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsCsv.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsCsv.java
deleted file mode 100644
index bfae653..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsCsv.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.sink;
-
-import java.io.BufferedWriter;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.ArrayList;
-
-/**
- * Writes tuples in csv format.
- *
- * @param <IN>
- * Input tuple type
- */
-public class WriteFormatAsCsv<IN> extends WriteFormat<IN> {
- private static final long serialVersionUID = 1L;
-
- @Override
- protected void write(String path, ArrayList<IN> tupleList) {
- try {
- PrintWriter outStream = new PrintWriter(new BufferedWriter(new FileWriter(path, true)));
- for (IN tupleToWrite : tupleList) {
- String strTuple = tupleToWrite.toString();
- outStream.println(strTuple.substring(1, strTuple.length() - 1));
- }
- outStream.close();
- } catch (IOException e) {
- throw new RuntimeException("Exception occured while writing file " + path, e);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsText.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsText.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsText.java
deleted file mode 100644
index 03fcb5c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsText.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.sink;
-
-import java.io.BufferedWriter;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.ArrayList;
-
-/**
- * Writes tuples in text format.
- *
- * @param <IN>
- * Input tuple type
- */
-public class WriteFormatAsText<IN> extends WriteFormat<IN> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void write(String path, ArrayList<IN> tupleList) {
- try {
- PrintWriter outStream = new PrintWriter(new BufferedWriter(new FileWriter(path, true)));
- for (IN tupleToWrite : tupleList) {
- outStream.println(tupleToWrite);
- }
- outStream.close();
- } catch (IOException e) {
- throw new RuntimeException("Exception occured while writing file " + path, e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteSinkFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteSinkFunction.java
deleted file mode 100644
index 27c352f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteSinkFunction.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.sink;
-
-import java.io.FileNotFoundException;
-import java.io.PrintWriter;
-import java.util.ArrayList;
-
-/**
- * Simple implementation of the SinkFunction writing tuples as simple text to
- * the file specified by path. Tuples are collected to a list and written to the
- * file periodically. The file specified by path is created if it does not
- * exist, cleared if it exists before the writing.
- *
- * @param <IN>
- * Input tuple type
- */
-public abstract class WriteSinkFunction<IN> implements SinkFunction<IN> {
- private static final long serialVersionUID = 1L;
-
- protected final String path;
- protected ArrayList<IN> tupleList = new ArrayList<IN>();
- protected WriteFormat<IN> format;
-
- public WriteSinkFunction(String path, WriteFormat<IN> format) {
- this.path = path;
- this.format = format;
- cleanFile(path);
- }
-
- /**
- * Creates target file if it does not exist, cleans it if it exists.
- *
- * @param path
- * is the path to the location where the tuples are written
- */
- protected void cleanFile(String path) {
- try {
- PrintWriter writer;
- writer = new PrintWriter(path);
- writer.print("");
- writer.close();
- } catch (FileNotFoundException e) {
- throw new RuntimeException("An error occurred while cleaning the file: " + e.getMessage(), e);
- }
- }
-
- /**
- * Condition for writing the contents of tupleList and clearing it.
- *
- * @return value of the updating condition
- */
- protected abstract boolean updateCondition();
-
- /**
- * Statements to be executed after writing a batch goes here.
- */
- protected abstract void resetParameters();
-
- /**
- * Implementation of the invoke method of the SinkFunction class. Collects
- * the incoming tuples in tupleList and appends the list to the end of the
- * target file if updateCondition() is true or the current tuple is the
- * endTuple.
- */
- @Override
- public void invoke(IN tuple) {
-
- tupleList.add(tuple);
- if (updateCondition()) {
- format.write(path, tupleList);
- resetParameters();
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteSinkFunctionByMillis.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteSinkFunctionByMillis.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteSinkFunctionByMillis.java
deleted file mode 100644
index 0364174..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteSinkFunctionByMillis.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.sink;
-
-/**
- * Implementation of WriteSinkFunction. Writes tuples to file in every millis
- * milliseconds.
- *
- * @param <IN>
- * Input tuple type
- */
-public class WriteSinkFunctionByMillis<IN> extends WriteSinkFunction<IN> {
- private static final long serialVersionUID = 1L;
-
- private final long millis;
- private long lastTime;
-
- public WriteSinkFunctionByMillis(String path, WriteFormat<IN> format, long millis) {
- super(path, format);
- this.millis = millis;
- lastTime = System.currentTimeMillis();
- }
-
- @Override
- protected boolean updateCondition() {
- return System.currentTimeMillis() - lastTime >= millis;
- }
-
- @Override
- protected void resetParameters() {
- tupleList.clear();
- lastTime = System.currentTimeMillis();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/ConnectorSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/ConnectorSource.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/ConnectorSource.java
deleted file mode 100644
index 0d107f6..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/ConnectorSource.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.source;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-
-public abstract class ConnectorSource<OUT> extends RichParallelSourceFunction<OUT> implements ResultTypeQueryable<OUT> {
-
- private static final long serialVersionUID = 1L;
-
- protected DeserializationSchema<OUT> schema;
-
- public ConnectorSource(DeserializationSchema<OUT> schema) {
- this.schema = schema;
- }
-
- @Override
- public TypeInformation<OUT> getProducedType() {
- return schema.getProducedType();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/EventTimeSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/EventTimeSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/EventTimeSourceFunction.java
deleted file mode 100644
index ab380d7..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/EventTimeSourceFunction.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.functions.source;
-
-
-/**
- * A marker interface that must be implemented by {@link SourceFunction}s that emit elements with
- * timestamps. The {@link SourceFunction} can extract the timestamp from the data and attach it to
- * the element upon emission.
- *
- * <p>
- * Event-time sources must manually emit
- * {@link org.apache.flink.streaming.api.watermark.Watermark watermarks} to keep track of progress.
- * Automatic emission of watermarks will be suppressed if a source implements this interface.
- *
- * <p>
- * Elements must be emitted using
- * {@link SourceFunction.SourceContext#collectWithTimestamp(Object, long)}
- * and watermarks can be emitted using
- * {@link SourceFunction.SourceContext#emitWatermark(org.apache.flink.streaming.api.watermark.Watermark)}.
- *
- * @param <T> Type of the elements emitted by this source.
- */
-public interface EventTimeSourceFunction<T> extends SourceFunction<T> { }
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java
deleted file mode 100644
index a217923..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.source;
-
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.core.fs.FileStatus;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class FileMonitoringFunction implements SourceFunction<Tuple3<String, Long, Long>> {
- private static final long serialVersionUID = 1L;
-
- private static final Logger LOG = LoggerFactory.getLogger(FileMonitoringFunction.class);
-
- public enum WatchType {
- ONLY_NEW_FILES, // Only new files will be processed.
- REPROCESS_WITH_APPENDED, // When some files are appended, all contents
- // of the files will be processed.
- PROCESS_ONLY_APPENDED // When some files are appended, only appended
- // contents will be processed.
- }
-
- private String path;
- private long interval;
- private WatchType watchType;
-
- private Map<String, Long> offsetOfFiles;
- private Map<String, Long> modificationTimes;
-
- private volatile boolean isRunning = true;
-
- public FileMonitoringFunction(String path, long interval, WatchType watchType) {
- this.path = path;
- this.interval = interval;
- this.watchType = watchType;
- this.modificationTimes = new HashMap<String, Long>();
- this.offsetOfFiles = new HashMap<String, Long>();
- }
-
- @Override
- public void run(SourceContext<Tuple3<String, Long, Long>> ctx) throws Exception {
- FileSystem fileSystem = FileSystem.get(new URI(path));
-
- while (isRunning) {
- List<String> files = listNewFiles(fileSystem);
- for (String filePath : files) {
- if (watchType == WatchType.ONLY_NEW_FILES
- || watchType == WatchType.REPROCESS_WITH_APPENDED) {
- ctx.collect(new Tuple3<String, Long, Long>(filePath, 0L, -1L));
- offsetOfFiles.put(filePath, -1L);
- } else if (watchType == WatchType.PROCESS_ONLY_APPENDED) {
- long offset = 0;
- long fileSize = fileSystem.getFileStatus(new Path(filePath)).getLen();
- if (offsetOfFiles.containsKey(filePath)) {
- offset = offsetOfFiles.get(filePath);
- }
-
- ctx.collect(new Tuple3<String, Long, Long>(filePath, offset, fileSize));
- offsetOfFiles.put(filePath, fileSize);
-
- LOG.info("File processed: {}, {}, {}", filePath, offset, fileSize);
- }
- }
-
- Thread.sleep(interval);
- }
- }
-
- private List<String> listNewFiles(FileSystem fileSystem) throws IOException {
- List<String> files = new ArrayList<String>();
-
- FileStatus[] statuses = fileSystem.listStatus(new Path(path));
-
- if (statuses == null) {
- LOG.warn("Path does not exist: {}", path);
- } else {
- for (FileStatus status : statuses) {
- Path filePath = status.getPath();
- String fileName = filePath.getName();
- long modificationTime = status.getModificationTime();
-
- if (!isFiltered(fileName, modificationTime)) {
- files.add(filePath.toString());
- modificationTimes.put(fileName, modificationTime);
- }
- }
- }
-
- return files;
- }
-
- private boolean isFiltered(String fileName, long modificationTime) {
-
- if ((watchType == WatchType.ONLY_NEW_FILES && modificationTimes.containsKey(fileName))
- || fileName.startsWith(".") || fileName.contains("_COPYING_")) {
- return true;
- } else {
- Long lastModification = modificationTimes.get(fileName);
- return lastModification != null && lastModification >= modificationTime;
- }
- }
-
- @Override
- public void cancel() {
- isRunning = false;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileReadFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileReadFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileReadFunction.java
deleted file mode 100644
index 4f859e8..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileReadFunction.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.source;
-
-import java.io.BufferedReader;
-import java.io.InputStreamReader;
-import java.net.URI;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.util.Collector;
-
-public class FileReadFunction implements FlatMapFunction<Tuple3<String, Long, Long>, String> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public void flatMap(Tuple3<String, Long, Long> value, Collector<String> out) throws Exception {
- FSDataInputStream stream = FileSystem.get(new URI(value.f0)).open(new Path(value.f0));
- stream.seek(value.f1);
-
- BufferedReader reader = new BufferedReader(new InputStreamReader(stream));
- String line;
-
- try {
- while ((line = reader.readLine()) != null && (value.f2 == -1L || stream.getPos() <= value.f2)) {
- out.collect(line);
- }
- } finally {
- reader.close();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
deleted file mode 100644
index cc3925c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.source;
-
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-public class FileSourceFunction<OUT> extends RichParallelSourceFunction<OUT> {
- private static final long serialVersionUID = 1L;
-
- private TypeInformation<OUT> typeInfo;
- private transient TypeSerializer<OUT> serializer;
-
- private InputFormat<OUT, InputSplit> format;
-
- private transient InputSplitProvider provider;
- private transient Iterator<InputSplit> splitIterator;
-
- private volatile boolean isRunning = true;
-
- @SuppressWarnings("unchecked")
- public FileSourceFunction(InputFormat<OUT, ?> format, TypeInformation<OUT> typeInfo) {
- this.format = (InputFormat<OUT, InputSplit>) format;
- this.typeInfo = typeInfo;
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public void open(Configuration parameters) throws Exception {
- StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
- this.provider = context.getInputSplitProvider();
-
- format.configure(parameters);
- serializer = typeInfo.createSerializer(getRuntimeContext().getExecutionConfig());
-
- splitIterator = getInputSplits();
- if (splitIterator.hasNext()) {
- format.open(splitIterator.next());
- }
- isRunning = true;
- }
-
- @Override
- public void close() throws Exception {
- format.close();
- }
-
- private Iterator<InputSplit> getInputSplits() {
-
- return new Iterator<InputSplit>() {
-
- private InputSplit nextSplit;
-
- private boolean exhausted;
-
- @Override
- public boolean hasNext() {
- if (exhausted) {
- return false;
- }
-
- if (nextSplit != null) {
- return true;
- }
-
- InputSplit split = provider.getNextInputSplit();
-
- if (split != null) {
- this.nextSplit = split;
- return true;
- } else {
- exhausted = true;
- return false;
- }
- }
-
- @Override
- public InputSplit next() {
- if (this.nextSplit == null && !hasNext()) {
- throw new NoSuchElementException();
- }
-
- final InputSplit tmp = this.nextSplit;
- this.nextSplit = null;
- return tmp;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- };
- }
-
- @Override
- public void run(SourceContext<OUT> ctx) throws Exception {
- while (isRunning) {
- OUT nextElement = serializer.createInstance();
- nextElement = format.nextRecord(nextElement);
- if (nextElement == null && splitIterator.hasNext()) {
- format.open(splitIterator.next());
- continue;
- } else if (nextElement == null) {
- break;
- }
- ctx.collect(nextElement);
- }
- }
-
- @Override
- public void cancel() {
- isRunning = false;
- }
-
-
- /**
- * Returns the {@code InputFormat}. This is only needed because we need to set the input
- * split assigner on the {@code StreamGraph}.
- */
- public InputFormat<OUT, InputSplit> getFormat() {
- return format;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
deleted file mode 100644
index af47f59..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.source;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
-import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-
-/**
- * A stream source function that returns a sequence of elements.
- *
- * <p>Upon construction, this source function serializes the elements using Flink's type information.
- * That way, any object transport using Java serialization will not be affected by the serializability
- * of the elements.</p>
- *
- * @param <T> The type of elements returned by this function.
- */
-public class FromElementsFunction<T> implements SourceFunction<T>, CheckpointedAsynchronously<Integer> {
-
- private static final long serialVersionUID = 1L;
-
- /** The (de)serializer to be used for the data elements */
- private final TypeSerializer<T> serializer;
-
- /** The actual data elements, in serialized form */
- private final byte[] elementsSerialized;
-
- /** The number of serialized elements */
- private final int numElements;
-
- /** The number of elements emitted already */
- private volatile int numElementsEmitted;
-
- /** The number of elements to skip initially */
- private volatile int numElementsToSkip;
-
- /** Flag to make the source cancelable */
- private volatile boolean isRunning = true;
-
-
- public FromElementsFunction(TypeSerializer<T> serializer, T... elements) throws IOException {
- this(serializer, Arrays.asList(elements));
- }
-
- public FromElementsFunction(TypeSerializer<T> serializer, Iterable<T> elements) throws IOException {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- OutputViewDataOutputStreamWrapper wrapper = new OutputViewDataOutputStreamWrapper(new DataOutputStream(baos));
-
- int count = 0;
- try {
- for (T element : elements) {
- serializer.serialize(element, wrapper);
- count++;
- }
- }
- catch (Exception e) {
- throw new IOException("Serializing the source elements failed: " + e.getMessage(), e);
- }
-
- this.serializer = serializer;
- this.elementsSerialized = baos.toByteArray();
- this.numElements = count;
- }
-
- @Override
- public void run(SourceContext<T> ctx) throws Exception {
- ByteArrayInputStream bais = new ByteArrayInputStream(elementsSerialized);
- final DataInputView input = new InputViewDataInputStreamWrapper(new DataInputStream(bais));
-
- // if we are restored from a checkpoint and need to skip elements, skip them now.
- int toSkip = numElementsToSkip;
- if (toSkip > 0) {
- try {
- while (toSkip > 0) {
- serializer.deserialize(input);
- toSkip--;
- }
- }
- catch (Exception e) {
- throw new IOException("Failed to deserialize an element from the source. " +
- "If you are using user-defined serialization (Value and Writable types), check the " +
- "serialization functions.\nSerializer is " + serializer);
- }
-
- this.numElementsEmitted = this.numElementsToSkip;
- }
-
- final Object lock = ctx.getCheckpointLock();
-
- while (isRunning && numElementsEmitted < numElements) {
- T next;
- try {
- next = serializer.deserialize(input);
- }
- catch (Exception e) {
- throw new IOException("Failed to deserialize an element from the source. " +
- "If you are using user-defined serialization (Value and Writable types), check the " +
- "serialization functions.\nSerializer is " + serializer);
- }
-
- synchronized (lock) {
- ctx.collect(next);
- numElementsEmitted++;
- }
- }
- }
-
- @Override
- public void cancel() {
- isRunning = false;
- }
-
-
- /**
- * Gets the number of elements produced in total by this function.
- *
- * @return The number of elements produced in total.
- */
- public int getNumElements() {
- return numElements;
- }
-
- /**
- * Gets the number of elements emitted so far.
- *
- * @return The number of elements emitted so far.
- */
- public int getNumElementsEmitted() {
- return numElementsEmitted;
- }
-
- // ------------------------------------------------------------------------
- // Checkpointing
- // ------------------------------------------------------------------------
-
- @Override
- public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
- return this.numElementsEmitted;
- }
-
- @Override
- public void restoreState(Integer state) {
- this.numElementsToSkip = state;
- }
-
- // ------------------------------------------------------------------------
- // Utilities
- // ------------------------------------------------------------------------
-
- /**
- * Verifies that all elements in the collection are non-null, and are of the given class, or
- * a subclass thereof.
- *
- * @param elements The collection to check.
- * @param viewedAs The class to which the elements must be assignable to.
- *
- * @param <OUT> The generic type of the collection to be checked.
- */
- public static <OUT> void checkCollection(Collection<OUT> elements, Class<OUT> viewedAs) {
- for (OUT elem : elements) {
- if (elem == null) {
- throw new IllegalArgumentException("The collection contains a null element");
- }
-
- if (!viewedAs.isAssignableFrom(elem.getClass())) {
- throw new IllegalArgumentException("The elements in the collection are not all subclasses of " +
- viewedAs.getCanonicalName());
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java
deleted file mode 100644
index 655710e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.source;
-
-import java.util.Iterator;
-
-public class FromIteratorFunction<T> implements SourceFunction<T> {
-
- private static final long serialVersionUID = 1L;
-
- private final Iterator<T> iterator;
-
- private volatile boolean isRunning = true;
-
- public FromIteratorFunction(Iterator<T> iterator) {
- this.iterator = iterator;
- }
-
- @Override
- public void run(SourceContext<T> ctx) throws Exception {
- while (isRunning && iterator.hasNext()) {
- ctx.collect(iterator.next());
- }
- }
-
- @Override
- public void cancel() {
- isRunning = false;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java
deleted file mode 100644
index bc78e4d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.source;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.SplittableIterator;
-
-import java.util.Iterator;
-
-public class FromSplittableIteratorFunction<T> extends RichParallelSourceFunction<T> {
-
- private static final long serialVersionUID = 1L;
-
- private SplittableIterator<T> fullIterator;
-
- private transient Iterator<T> iterator;
-
- private volatile boolean isRunning = true;
-
- public FromSplittableIteratorFunction(SplittableIterator<T> iterator) {
- this.fullIterator = iterator;
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- int numberOfSubTasks = getRuntimeContext().getNumberOfParallelSubtasks();
- int indexofThisSubTask = getRuntimeContext().getIndexOfThisSubtask();
- iterator = fullIterator.split(numberOfSubTasks)[indexofThisSubTask];
- isRunning = true;
- }
-
- @Override
- public void run(SourceContext<T> ctx) throws Exception {
- while (isRunning && iterator.hasNext()) {
- ctx.collect(iterator.next());
- }
- }
-
- @Override
- public void cancel() {
- isRunning = false;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledingSourceBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledingSourceBase.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledingSourceBase.java
deleted file mode 100644
index 3ac63af..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledingSourceBase.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.source;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.apache.flink.runtime.state.SerializedCheckpointData;
-
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-/**
- * Abstract base class for data sources that receive elements from a message queue and
- * acknowledge them back by IDs.
- * <p>
- * The mechanism for this source assumes that messages are identified by a unique ID.
- * When messages are taken from the message queue, the message must not be dropped immediately,
- * but must be retained until acknowledged. Messages that are not acknowledged within a certain
- * time interval will be served again (to a different connection, established by the recovered source).
- * <p>
- * Note that this source can give no guarantees about message order in teh case of failures,
- * because messages that were retrieved but not yet acknowledged will be returned later again, after
- * a set of messages that was not retrieved before the failure.
- * <p>
- * Internally, this source gathers the IDs of elements it emits. Per checkpoint, the IDs are stored and
- * acknowledged when the checkpoint is complete. That way, no message is acknowledged unless it is certain
- * that it has been successfully processed throughout the topology and the updates to any state caused by
- * that message are persistent.
- * <p>
- * All messages that are emitted and successfully processed by the streaming program will eventually be
- * acknowledged. In corner cases, the source may acknowledge certain IDs multiple times, if a
- * failure occurs while acknowledging.
- * <p>
- * A typical way to use this base in a source function is by implementing a run() method as follows:
- * <pre>{@code
- * public void run(SourceContext<Type> ctx) throws Exception {
- * while (running) {
- * Message msg = queue.retrieve();
- * synchronized (ctx.getCheckpointLock()) {
- * ctx.collect(msg.getMessageData());
- * addId(msg.getMessageId());
- * }
- * }
- * }
- * }</pre>
- *
- * @param <Type> The type of the messages created by the source.
- * @param <Id> The type of the IDs that are used for acknowledging elements.
- */
-public abstract class MessageAcknowledingSourceBase<Type, Id> extends RichSourceFunction<Type>
- implements Checkpointed<SerializedCheckpointData[]>, CheckpointNotifier {
-
- private static final long serialVersionUID = -8689291992192955579L;
-
- /** Serializer used to serialize the IDs for checkpoints */
- private final TypeSerializer<Id> idSerializer;
-
- /** The list gathering the IDs of messages emitted during the current checkpoint */
- private transient List<Id> idsForCurrentCheckpoint;
-
- /** The list with IDs from checkpoints that were triggered, but not yet completed or notified of completion */
- private transient ArrayDeque<Tuple2<Long, List<Id>>> pendingCheckpoints;
-
-
- // ------------------------------------------------------------------------
-
- /**
- * Creates a new MessageAcknowledingSourceBase for IDs of teh given type.
- *
- * @param idClass The class of the message ID type, used to create a serializer for the message IDs.
- */
- protected MessageAcknowledingSourceBase(Class<Id> idClass) {
- this(TypeExtractor.getForClass(idClass));
- }
-
- /**
- * Creates a new MessageAcknowledingSourceBase for IDs of teh given type.
- *
- * @param idTypeInfo The type information of the message ID type, used to create a serializer for the message IDs.
- */
- protected MessageAcknowledingSourceBase(TypeInformation<Id> idTypeInfo) {
- this.idSerializer = idTypeInfo.createSerializer(new ExecutionConfig());
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- idsForCurrentCheckpoint = new ArrayList<>(64);
- pendingCheckpoints = new ArrayDeque<>();
- }
-
- @Override
- public void close() throws Exception {
- idsForCurrentCheckpoint.clear();
- pendingCheckpoints.clear();
- }
-
-
- // ------------------------------------------------------------------------
- // ID Checkpointing
- // ------------------------------------------------------------------------
-
- /**
- * This method must be implemented to acknowledge the given set of IDs back to the message queue.
- * @param ids The list od IDs to acknowledge.
- */
- protected abstract void acknowledgeIDs(List<Id> ids);
-
- /**
- * Adds an ID to be stored with the current checkpoint.
- * @param id The ID to add.
- */
- protected void addId(Id id) {
- idsForCurrentCheckpoint.add(id);
- }
-
- // ------------------------------------------------------------------------
- // Checkpointing the data
- // ------------------------------------------------------------------------
-
- @Override
- public SerializedCheckpointData[] snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
- pendingCheckpoints.addLast(new Tuple2<Long, List<Id>>(checkpointId, idsForCurrentCheckpoint));
- idsForCurrentCheckpoint = new ArrayList<>(64);
-
- return SerializedCheckpointData.fromDeque(pendingCheckpoints, idSerializer);
- }
-
- @Override
- public void restoreState(SerializedCheckpointData[] state) throws Exception {
- pendingCheckpoints = SerializedCheckpointData.toDeque(state, idSerializer);
- }
-
- @Override
- public void notifyCheckpointComplete(long checkpointId) throws Exception {
- for (Iterator<Tuple2<Long, List<Id>>> iter = pendingCheckpoints.iterator(); iter.hasNext();) {
- Tuple2<Long, List<Id>> checkpoint = iter.next();
- long id = checkpoint.f0;
-
- if (id <= checkpointId) {
- acknowledgeIDs(checkpoint.f1);
- iter.remove();
- }
- else {
- break;
- }
- }
- }
-}