You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/04/15 11:38:57 UTC
[16/19] flink git commit: [streaming] Major internal renaming and
restructure
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java
deleted file mode 100644
index f9aa79d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.co;
-
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.util.Collector;
-
-public class JoinWindowFunction<IN1, IN2, OUT> implements CoWindowFunction<IN1, IN2, OUT> {
- private static final long serialVersionUID = 1L;
-
- private KeySelector<IN1, ?> keySelector1;
- private KeySelector<IN2, ?> keySelector2;
- private JoinFunction<IN1, IN2, OUT> joinFunction;
-
- public JoinWindowFunction(KeySelector<IN1, ?> keySelector1, KeySelector<IN2, ?> keySelector2,
- JoinFunction<IN1, IN2, OUT> joinFunction) {
- this.keySelector1 = keySelector1;
- this.keySelector2 = keySelector2;
- this.joinFunction = joinFunction;
- }
-
- @Override
- public void coWindow(List<IN1> first, List<IN2> second, Collector<OUT> out) throws Exception {
-
- Map<Object, List<IN1>> map = build(first);
-
- for (IN2 record : second) {
- Object key = keySelector2.getKey(record);
- List<IN1> match = map.get(key);
- if (match != null) {
- for (IN1 matching : match) {
- out.collect(joinFunction.join(matching, record));
- }
- }
- }
-
- }
-
- private Map<Object, List<IN1>> build(List<IN1> records) throws Exception {
-
- Map<Object, List<IN1>> map = new HashMap<Object, List<IN1>>();
-
- for (IN1 record : records) {
- Object key = keySelector1.getKey(record);
- List<IN1> current = map.get(key);
- if (current == null) {
- current = new LinkedList<IN1>();
- map.put(key, current);
- }
- current.add(record);
- }
-
- return map;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoFlatMapFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoFlatMapFunction.java
deleted file mode 100644
index 2458f1b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoFlatMapFunction.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.co;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.functions.RichFunction;
-
-/**
- * A RichCoFlatMapFunction represents a FlatMap transformation with two different input
- * types. In addition to that the user can use the features provided by the
- * {@link RichFunction} interface.
- *
- * @param <IN1>
- * Type of the first input.
- * @param <IN2>
- * Type of the second input.
- * @param <OUT>
- * Output type.
- */
-public abstract class RichCoFlatMapFunction<IN1, IN2, OUT> extends AbstractRichFunction implements
- CoFlatMapFunction<IN1, IN2, OUT> {
-
- private static final long serialVersionUID = 1L;
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoMapFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoMapFunction.java
deleted file mode 100644
index 20d520c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoMapFunction.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.co;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.functions.RichFunction;
-
-/**
- * A RichCoMapFunction represents a Map transformation with two different input
- * types. In addition to that the user can use the features provided by the
- * {@link RichFunction} interface.
- *
- * @param <IN1>
- * Type of the first input.
- * @param <IN2>
- * Type of the second input.
- * @param <OUT>
- * Output type.
- */
-public abstract class RichCoMapFunction<IN1, IN2, OUT> extends AbstractRichFunction implements
- CoMapFunction<IN1, IN2, OUT> {
-
- private static final long serialVersionUID = 1L;
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoReduceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoReduceFunction.java
deleted file mode 100644
index 655923f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoReduceFunction.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.co;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.functions.RichFunction;
-
-/**
- * A RichCoReduceFunction represents a Reduce transformation with two different input
- * types. In addition to that the user can use the features provided by the
- * {@link RichFunction} interface.
- *
- * @param <IN1>
- * Type of the first input.
- * @param <IN2>
- * Type of the second input.
- * @param <OUT>
- * Output type.
- */
-public abstract class RichCoReduceFunction<IN1, IN2, OUT> extends AbstractRichFunction implements
- CoReduceFunction<IN1, IN2, OUT> {
-
- private static final long serialVersionUID = 1L;
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoWindowFunction.java
deleted file mode 100644
index 2709203..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoWindowFunction.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.co;
-
-import java.util.List;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.util.Collector;
-
-public abstract class RichCoWindowFunction<IN1, IN2, O> extends AbstractRichFunction implements
- CoWindowFunction<IN1, IN2, O> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public abstract void coWindow(List<IN1> first, List<IN2> second, Collector<O> out)
- throws Exception;
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunction.java
deleted file mode 100644
index 24beba1..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunction.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.sink;
-
-import java.io.IOException;
-import java.util.ArrayList;
-
-import org.apache.flink.api.common.io.CleanupWhenUnsuccessful;
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Simple implementation of the SinkFunction writing tuples in the specified
- * OutputFormat format. Tuples are collected to a list and written to the file
- * periodically. The target path and the overwrite mode are pre-packaged in
- * format.
- *
- * @param <IN>
- * Input type
- */
-public abstract class FileSinkFunction<IN> extends RichSinkFunction<IN> {
- private static final long serialVersionUID = 1L;
- private static final Logger LOG = LoggerFactory.getLogger(FileSinkFunction.class);
- protected ArrayList<IN> tupleList = new ArrayList<IN>();
- protected volatile OutputFormat<IN> format;
- protected volatile boolean cleanupCalled = false;
- protected int indexInSubtaskGroup;
- protected int currentNumberOfSubtasks;
-
- public FileSinkFunction(OutputFormat<IN> format) {
- this.format = format;
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
- format.configure(context.getTaskStubParameters());
- indexInSubtaskGroup = context.getIndexOfThisSubtask();
- currentNumberOfSubtasks = context.getNumberOfParallelSubtasks();
- format.open(indexInSubtaskGroup, currentNumberOfSubtasks);
- }
-
- @Override
- public void invoke(IN record) throws Exception {
- tupleList.add(record);
- if (updateCondition()) {
- flush();
- }
- }
-
- @Override
- public void close() throws IOException {
- if (!tupleList.isEmpty()) {
- flush();
- }
- try {
- format.close();
- } catch (Exception ex) {
- try {
- if (!cleanupCalled && format instanceof CleanupWhenUnsuccessful) {
- cleanupCalled = true;
- ((CleanupWhenUnsuccessful) format).tryCleanupOnError();
- }
- } catch (Throwable t) {
- LOG.error("Cleanup on error failed.", t);
- }
- }
- }
-
- protected void flush() {
- try {
- for (IN rec : tupleList) {
- format.writeRecord(rec);
- }
- } catch (Exception ex) {
- try {
- if (!cleanupCalled && format instanceof CleanupWhenUnsuccessful) {
- cleanupCalled = true;
- ((CleanupWhenUnsuccessful) format).tryCleanupOnError();
- }
- } catch (Throwable t) {
- LOG.error("Cleanup on error failed.", t);
- }
- }
- resetParameters();
- }
-
- /**
- * Condition for writing the contents of tupleList and clearing it.
- *
- * @return value of the updating condition
- */
- protected abstract boolean updateCondition();
-
- /**
- * Statements to be executed after writing a batch goes here.
- */
- protected abstract void resetParameters();
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunctionByMillis.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunctionByMillis.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunctionByMillis.java
deleted file mode 100644
index f049a32..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunctionByMillis.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.sink;
-
-import org.apache.flink.api.common.io.OutputFormat;
-
-/**
- * Implementation of FileSinkFunction. Writes tuples to file in every millis
- * milliseconds.
- *
- * @param <IN>
- * Input type
- */
-public class FileSinkFunctionByMillis<IN> extends FileSinkFunction<IN> {
- private static final long serialVersionUID = 1L;
-
- private final long millis;
- private long lastTime;
-
- public FileSinkFunctionByMillis(OutputFormat<IN> format, long millis) {
- super(format);
- this.millis = millis;
- lastTime = System.currentTimeMillis();
- }
-
- /**
- * Condition for writing the contents of tupleList and clearing it.
- *
- * @return value of the updating condition
- */
- @Override
- protected boolean updateCondition() {
- return System.currentTimeMillis() - lastTime >= millis;
- }
-
- /**
- * Statements to be executed after writing a batch goes here.
- */
- @Override
- protected void resetParameters() {
- tupleList.clear();
- lastTime = System.currentTimeMillis();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
deleted file mode 100644
index 947f8ab..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.sink;
-
-import java.io.PrintStream;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext;
-
-/**
- * Implementation of the SinkFunction writing every tuple to the standard
- * output or standard error stream.
- *
- * @param <IN>
- * Input record type
- */
-public class PrintSinkFunction<IN> extends RichSinkFunction<IN> {
- private static final long serialVersionUID = 1L;
-
- private static final boolean STD_OUT = false;
- private static final boolean STD_ERR = true;
-
- private boolean target;
- private transient PrintStream stream;
- private transient String prefix;
-
- /**
- * Instantiates a print sink function that prints to standard out.
- */
- public PrintSinkFunction() {}
-
- /**
- * Instantiates a print sink function that prints to standard out.
- *
- * @param stdErr True, if the format should print to standard error instead of standard out.
- */
- public PrintSinkFunction(boolean stdErr) {
- target = stdErr;
- }
-
- public void setTargetToStandardOut() {
- target = STD_OUT;
- }
-
- public void setTargetToStandardErr() {
- target = STD_ERR;
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
- // get the target stream
- stream = target == STD_OUT ? System.out : System.err;
-
- // set the prefix if we have a >1 parallelism
- prefix = (context.getNumberOfParallelSubtasks() > 1) ?
- ((context.getIndexOfThisSubtask() + 1) + "> ") : null;
- }
-
- @Override
- public void invoke(IN record) {
- if (prefix != null) {
- stream.println(prefix + record.toString());
- }
- else {
- stream.println(record.toString());
- }
- }
-
- @Override
- public void close() {
- this.stream = null;
- this.prefix = null;
- }
-
- @Override
- public String toString() {
- return "Print to " + (target == STD_OUT ? "System.out" : "System.err");
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/RichSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/RichSinkFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/RichSinkFunction.java
deleted file mode 100644
index 3b8a4db..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/RichSinkFunction.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.sink;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-
-public abstract class RichSinkFunction<IN> extends AbstractRichFunction implements SinkFunction<IN> {
-
- private static final long serialVersionUID = 1L;
-
- public abstract void invoke(IN value) throws Exception;
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
deleted file mode 100644
index eed4234..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.sink;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.common.functions.Function;
-
-/**
- * Interface for implementing user defined sink functionality.
- *
- * @param <IN> Input type parameter.
- */
-public interface SinkFunction<IN> extends Function, Serializable {
-
- /**
- * Function for standard sink behaviour. This function is called for every record.
- *
- * @param value The input record.
- * @throws Exception
- */
- public void invoke(IN value) throws Exception;
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SocketClientSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SocketClientSink.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SocketClientSink.java
deleted file mode 100644
index c582c1b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SocketClientSink.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.sink;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.Socket;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Socket client that acts as a streaming sink. The data is sent to a Socket as a byte array.
- *
- * @param <IN> data to be written into the Socket.
- */
-public class SocketClientSink<IN> extends RichSinkFunction<IN> {
- private static final long serialVersionUID = 1L;
-
- private static final Logger LOG = LoggerFactory.getLogger(SocketClientSink.class);
-
- private final String hostName;
- private final int port;
- private final SerializationSchema<IN, byte[]> schema;
- private transient Socket client;
- private transient DataOutputStream dataOutputStream;
-
- /**
- * Default constructor.
- *
- * @param hostName Host of the Socket server.
- * @param port Port of the Socket.
- * @param schema Schema of the data.
- */
- public SocketClientSink(String hostName, int port, SerializationSchema<IN, byte[]> schema) {
- this.hostName = hostName;
- this.port = port;
- this.schema = schema;
- }
-
- /**
- * Initializes the connection to Socket.
- */
- public void intializeConnection() {
- OutputStream outputStream;
- try {
- client = new Socket(hostName, port);
- outputStream = client.getOutputStream();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- dataOutputStream = new DataOutputStream(outputStream);
- }
-
- /**
- * Called when new data arrives to the sink, and forwards it to Socket.
- *
- * @param value
- * The incoming data
- */
- @Override
- public void invoke(IN value) {
- byte[] msg = schema.serialize(value);
- try {
- dataOutputStream.write(msg);
- } catch (IOException e) {
- if(LOG.isErrorEnabled()){
- LOG.error("Cannot send message to socket server at " + hostName + ":" + port, e);
- }
- }
- }
-
- /**
- * Closes the connection of the Socket client.
- */
- private void closeConnection(){
- try {
- dataOutputStream.flush();
- client.close();
- } catch (IOException e) {
- throw new RuntimeException("Error while closing connection with socket server at "
- + hostName + ":" + port, e);
- } finally {
- if (client != null) {
- try {
- client.close();
- } catch (IOException e) {
- LOG.error("Cannot close connection with socket server at "
- + hostName + ":" + port, e);
- }
- }
- }
- }
-
- /**
- * Initialize the connection with the Socket in the server.
- * @param parameters Configuration.
- */
- @Override
- public void open(Configuration parameters) {
- intializeConnection();
- }
-
- /**
- * Closes the connection with the Socket server.
- */
- @Override
- public void close() {
- closeConnection();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormat.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormat.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormat.java
deleted file mode 100644
index a606302..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormat.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.sink;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-
-/**
- * Abstract class for formatting the output of the writeAsText and writeAsCsv
- * functions.
- *
- * @param <IN>
- * Input tuple type
- */
-public abstract class WriteFormat<IN> implements Serializable {
- private static final long serialVersionUID = 1L;
-
- /**
- * Writes the contents of tupleList to the file specified by path.
- *
- * @param path
- * is the path to the location where the tuples are written
- * @param tupleList
- * is the list of tuples to be written
- */
- protected abstract void write(String path, ArrayList<IN> tupleList);
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsCsv.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsCsv.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsCsv.java
deleted file mode 100644
index b22fd80..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsCsv.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.sink;
-
-import java.io.BufferedWriter;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.ArrayList;
-
-/**
- * Writes tuples in csv format.
- *
- * @param <IN>
- * Input tuple type
- */
-public class WriteFormatAsCsv<IN> extends WriteFormat<IN> {
- private static final long serialVersionUID = 1L;
-
- @Override
- protected void write(String path, ArrayList<IN> tupleList) {
- try {
- PrintWriter outStream = new PrintWriter(new BufferedWriter(new FileWriter(path, true)));
- for (IN tupleToWrite : tupleList) {
- String strTuple = tupleToWrite.toString();
- outStream.println(strTuple.substring(1, strTuple.length() - 1));
- }
- outStream.close();
- } catch (IOException e) {
- throw new RuntimeException("Exception occured while writing file " + path, e);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsText.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsText.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsText.java
deleted file mode 100644
index 5891104..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsText.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.sink;
-
-import java.io.BufferedWriter;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.ArrayList;
-
-/**
- * Writes tuples in text format.
- *
- * @param <IN>
- * Input tuple type
- */
-public class WriteFormatAsText<IN> extends WriteFormat<IN> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void write(String path, ArrayList<IN> tupleList) {
- try {
- PrintWriter outStream = new PrintWriter(new BufferedWriter(new FileWriter(path, true)));
- for (IN tupleToWrite : tupleList) {
- outStream.println(tupleToWrite);
- }
- outStream.close();
- } catch (IOException e) {
- throw new RuntimeException("Exception occured while writing file " + path, e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunction.java
deleted file mode 100644
index 0c52afc..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunction.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.sink;
-
-import java.io.FileNotFoundException;
-import java.io.PrintWriter;
-import java.util.ArrayList;
-
-/**
- * Simple implementation of the SinkFunction writing tuples as simple text to
- * the file specified by path. Tuples are collected to a list and written to the
- * file periodically. The file specified by path is created if it does not
- * exist, cleared if it exists before the writing.
- *
- * @param <IN>
- * Input tuple type
- */
-public abstract class WriteSinkFunction<IN> implements SinkFunction<IN> {
- private static final long serialVersionUID = 1L;
-
- protected final String path;
- protected ArrayList<IN> tupleList = new ArrayList<IN>();
- protected WriteFormat<IN> format;
-
- public WriteSinkFunction(String path, WriteFormat<IN> format) {
- this.path = path;
- this.format = format;
- cleanFile(path);
- }
-
- /**
- * Creates target file if it does not exist, cleans it if it exists.
- *
- * @param path
- * is the path to the location where the tuples are written
- */
- protected void cleanFile(String path) {
- try {
- PrintWriter writer;
- writer = new PrintWriter(path);
- writer.print("");
- writer.close();
- } catch (FileNotFoundException e) {
- throw new RuntimeException("File not found " + path, e);
- }
- }
-
- /**
- * Condition for writing the contents of tupleList and clearing it.
- *
- * @return value of the updating condition
- */
- protected abstract boolean updateCondition();
-
- /**
- * Statements to be executed after writing a batch goes here.
- */
- protected abstract void resetParameters();
-
- /**
- * Implementation of the invoke method of the SinkFunction class. Collects
- * the incoming tuples in tupleList and appends the list to the end of the
- * target file if updateCondition() is true or the current tuple is the
- * endTuple.
- */
- @Override
- public void invoke(IN tuple) {
-
- tupleList.add(tuple);
- if (updateCondition()) {
- format.write(path, tupleList);
- resetParameters();
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java
deleted file mode 100644
index ee6df94..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.sink;
-
-/**
- * Implementation of WriteSinkFunction. Writes tuples to file in every millis
- * milliseconds.
- *
- * @param <IN>
- * Input tuple type
- */
-public class WriteSinkFunctionByMillis<IN> extends WriteSinkFunction<IN> {
- private static final long serialVersionUID = 1L;
-
- private final long millis;
- private long lastTime;
-
- public WriteSinkFunctionByMillis(String path, WriteFormat<IN> format, long millis) {
- super(path, format);
- this.millis = millis;
- lastTime = System.currentTimeMillis();
- }
-
- @Override
- protected boolean updateCondition() {
- return System.currentTimeMillis() - lastTime >= millis;
- }
-
- @Override
- protected void resetParameters() {
- tupleList.clear();
- lastTime = System.currentTimeMillis();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileMonitoringFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileMonitoringFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileMonitoringFunction.java
deleted file mode 100644
index 2a84c0e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileMonitoringFunction.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.source;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.core.fs.FileStatus;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.util.Collector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class FileMonitoringFunction implements SourceFunction<Tuple3<String, Long, Long>> {
- private static final long serialVersionUID = 1L;
-
- private static final Logger LOG = LoggerFactory.getLogger(FileMonitoringFunction.class);
-
- public enum WatchType {
- ONLY_NEW_FILES, // Only new files will be processed.
- REPROCESS_WITH_APPENDED, // When some files are appended, all contents
- // of the files will be processed.
- PROCESS_ONLY_APPENDED // When some files are appended, only appended
- // contents will be processed.
- }
-
- private String path;
- private long interval;
- private WatchType watchType;
-
- private FileSystem fileSystem;
- private Map<String, Long> offsetOfFiles;
- private Map<String, Long> modificationTimes;
-
- private volatile boolean isRunning = false;
-
- public FileMonitoringFunction(String path, long interval, WatchType watchType) {
- this.path = path;
- this.interval = interval;
- this.watchType = watchType;
- this.modificationTimes = new HashMap<String, Long>();
- this.offsetOfFiles = new HashMap<String, Long>();
- }
-
- @Override
- public void run(Collector<Tuple3<String, Long, Long>> collector) throws Exception {
- isRunning = true;
- fileSystem = FileSystem.get(new URI(path));
-
- while (isRunning) {
- List<String> files = listNewFiles();
- for (String filePath : files) {
- if (watchType == WatchType.ONLY_NEW_FILES
- || watchType == WatchType.REPROCESS_WITH_APPENDED) {
- collector.collect(new Tuple3<String, Long, Long>(filePath, 0L, -1L));
- offsetOfFiles.put(filePath, -1L);
- } else if (watchType == WatchType.PROCESS_ONLY_APPENDED) {
- long offset = 0;
- long fileSize = fileSystem.getFileStatus(new Path(filePath)).getLen();
- if (offsetOfFiles.containsKey(filePath)) {
- offset = offsetOfFiles.get(filePath);
- }
-
- collector.collect(new Tuple3<String, Long, Long>(filePath, offset, fileSize));
- offsetOfFiles.put(filePath, fileSize);
-
- LOG.info("File processed: {}, {}, {}", filePath, offset, fileSize);
- }
- }
-
- Thread.sleep(interval);
- }
- }
-
- private List<String> listNewFiles() throws IOException {
- List<String> files = new ArrayList<String>();
-
- FileStatus[] statuses = fileSystem.listStatus(new Path(path));
-
- for (FileStatus status : statuses) {
- Path filePath = status.getPath();
- String fileName = filePath.getName();
- long modificationTime = status.getModificationTime();
-
- if (!isFiltered(fileName, modificationTime)) {
- files.add(filePath.toString());
- modificationTimes.put(fileName, modificationTime);
- }
- }
- return files;
- }
-
- private boolean isFiltered(String fileName, long modificationTime) {
-
- if ((watchType == WatchType.ONLY_NEW_FILES && modificationTimes.containsKey(fileName))
- || fileName.startsWith(".") || fileName.contains("_COPYING_")) {
- return true;
- } else {
- Long lastModification = modificationTimes.get(fileName);
- if (lastModification == null) {
- return false;
- } else {
- return lastModification >= modificationTime;
- }
- }
- }
-
- @Override
- public void cancel() {
- isRunning = false;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileReadFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileReadFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileReadFunction.java
deleted file mode 100644
index 0882d9e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileReadFunction.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.source;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.util.Collector;
-
-import java.io.BufferedReader;
-import java.io.InputStreamReader;
-import java.net.URI;
-
-public class FileReadFunction implements FlatMapFunction<Tuple3<String, Long, Long>, String> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public void flatMap(Tuple3<String, Long, Long> value, Collector<String> out) throws Exception {
- FSDataInputStream stream = FileSystem.get(new URI(value.f0)).open(new Path(value.f0));
- stream.seek(value.f1);
-
- BufferedReader reader = new BufferedReader(new InputStreamReader(stream));
- String line;
-
- try {
- while ((line = reader.readLine()) != null && (value.f2 == -1L || stream.getPos() <= value.f2)) {
- out.collect(line);
- }
- } finally {
- reader.close();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
deleted file mode 100644
index 9289355..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.source;
-
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
-import org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext;
-import org.apache.flink.util.Collector;
-
-public class FileSourceFunction extends RichParallelSourceFunction<String> {
- private static final long serialVersionUID = 1L;
-
- private InputSplitProvider provider;
-
- private InputFormat<String, ?> inputFormat;
-
- private TypeInformation<String> typeInfo;
-
- private volatile boolean isRunning;
-
- public FileSourceFunction(InputFormat<String, ?> format, TypeInformation<String> typeInfo) {
- this.inputFormat = format;
- this.typeInfo = typeInfo;
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
- this.provider = context.getInputSplitProvider();
- inputFormat.configure(context.getTaskStubParameters());
- }
-
- @Override
- public void run(Collector<String> collector) throws Exception {
- isRunning = true;
- final TypeSerializer<String> serializer = typeInfo.createSerializer(getRuntimeContext()
- .getExecutionConfig());
- final Iterator<InputSplit> splitIterator = getInputSplits();
- @SuppressWarnings("unchecked")
- final InputFormat<String, InputSplit> format = (InputFormat<String, InputSplit>) this.inputFormat;
- try {
- while (isRunning && splitIterator.hasNext()) {
-
- final InputSplit split = splitIterator.next();
- String record = serializer.createInstance();
-
- format.open(split);
- while (isRunning && !format.reachedEnd()) {
- if ((record = format.nextRecord(record)) != null) {
- collector.collect(record);
- }
- }
-
- }
- collector.close();
- } finally {
- format.close();
- }
- isRunning = false;
- }
-
- private Iterator<InputSplit> getInputSplits() {
-
- return new Iterator<InputSplit>() {
-
- private InputSplit nextSplit;
-
- private boolean exhausted;
-
- @Override
- public boolean hasNext() {
- if (exhausted) {
- return false;
- }
-
- if (nextSplit != null) {
- return true;
- }
-
- InputSplit split = provider.getNextInputSplit();
-
- if (split != null) {
- this.nextSplit = split;
- return true;
- } else {
- exhausted = true;
- return false;
- }
- }
-
- @Override
- public InputSplit next() {
- if (this.nextSplit == null && !hasNext()) {
- throw new NoSuchElementException();
- }
-
- final InputSplit tmp = this.nextSplit;
- this.nextSplit = null;
- return tmp;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- };
- }
-
- @Override
- public void cancel() {
- isRunning = false;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
deleted file mode 100644
index db452dc..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.source;
-
-import java.util.Arrays;
-import java.util.Collection;
-
-import org.apache.flink.util.Collector;
-
-public class FromElementsFunction<T> implements SourceFunction<T> {
- private static final long serialVersionUID = 1L;
-
- Iterable<T> iterable;
-
- private volatile boolean isRunning;
-
- public FromElementsFunction(T... elements) {
- this.iterable = Arrays.asList(elements);
- }
-
- public FromElementsFunction(Collection<T> elements) {
- this.iterable = elements;
- }
-
- public FromElementsFunction(Iterable<T> elements) {
- this.iterable = elements;
- }
-
- @Override
- public void run(Collector<T> collector) throws Exception {
- isRunning = true;
- for (T element : iterable) {
- if (isRunning) {
- collector.collect(element);
- } else {
- break;
- }
- }
- }
-
- @Override
- public void cancel() {
- isRunning = false;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
deleted file mode 100644
index df3b462..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.source;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.NumberSequenceIterator;
-
-/**
- * Source Function used to generate the number sequence
- *
- */
-public class GenSequenceFunction extends RichParallelSourceFunction<Long> {
-
- private static final long serialVersionUID = 1L;
-
- private NumberSequenceIterator fullIterator;
- private NumberSequenceIterator splitIterator;
-
- private volatile boolean isRunning;
-
- public GenSequenceFunction(long from, long to) {
- fullIterator = new NumberSequenceIterator(from, to);
- }
-
- @Override
- public void run(Collector<Long> collector) throws Exception {
- isRunning = true;
- while (splitIterator.hasNext() && isRunning) {
- collector.collect(splitIterator.next());
- }
- }
-
- @Override
- public void open(Configuration config) {
- int splitNumber = getRuntimeContext().getIndexOfThisSubtask();
- int numOfSubTasks = getRuntimeContext().getNumberOfParallelSubtasks();
- splitIterator = fullIterator.split(numOfSubTasks)[splitNumber];
- }
-
- @Override
- public void cancel() {
- isRunning = false;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenericSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenericSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenericSourceFunction.java
deleted file mode 100644
index 664d39a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenericSourceFunction.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.source;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-
-public interface GenericSourceFunction<T> {
-
- public TypeInformation<T> getType();
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/ParallelSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/ParallelSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/ParallelSourceFunction.java
deleted file mode 100644
index 3b96bf9..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/ParallelSourceFunction.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.source;
-
-/**
- * A stream data source that is executed in parallel. Upon execution, the runtime will
- * execute as many parallel instances of this function function as configured parallelism
- * of the source.
- *
- * <p>This interface acts only as a marker to tell the system that this source may
- * be executed in parallel. When different parallel instances are required to perform
- * different tasks, use the {@link RichParallelSourceFunction} to get access to the runtime
- * context, which revels information like the number of parallel tasks, and which parallel
- * task the current instance is.
- *
- * @param <OUT> The type of the records produced by this source.
- */
-public interface ParallelSourceFunction<OUT> extends SourceFunction<OUT> {
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/RichParallelSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/RichParallelSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/RichParallelSourceFunction.java
deleted file mode 100644
index 028c06a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/RichParallelSourceFunction.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.source;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-
-/**
- * Base class for implementing a data source that has access to context information
- * (via {@link #getRuntimeContext()}) and additional life-cycle methods
- * ({@link #open(org.apache.flink.configuration.Configuration)} and {@link #close()}.
- *
- * @param <OUT> The type of the records produced by this source.
- */
-public abstract class RichParallelSourceFunction<OUT> extends AbstractRichFunction
- implements ParallelSourceFunction<OUT> {
-
- private static final long serialVersionUID = 1L;
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/RichSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/RichSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/RichSourceFunction.java
deleted file mode 100644
index b9331cb..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/RichSourceFunction.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.source;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-
-/**
- * Base class for implementing a parallel data source that has access to context information
- * (via {@link #getRuntimeContext()}) and additional life-cycle methods
- * ({@link #open(org.apache.flink.configuration.Configuration)} and {@link #close()}.
- *
- * <p>This class is useful when implementing parallel sources where different parallel subtasks
- * need to perform different work. Typical patterns for that are:
- * <ul>
- * <li>Use {@link #getRuntimeContext()} to obtain the runtime context.</li>
- * <li>Use {@link org.apache.flink.api.common.functions.RuntimeContext#getNumberOfParallelSubtasks()}
- * to determine the current parallelism. It is strongly encouraged to use this method, rather than
- * hard-wiring the parallelism, because the configured parallelism may change depending on
- * program configuration. The parallelism may also change after recovering failures, when fewer than
- * desired parallel worker as available.</li>
- * <li>Use {@link org.apache.flink.api.common.functions.RuntimeContext#getIndexOfThisSubtask()} to
- * determine which subtask the current instance of the function executes.</li>
- * </ul>
- * </p>
- *
- * @param <OUT> The type of the records produced by this source.
- */
-public abstract class RichSourceFunction<OUT> extends AbstractRichFunction implements SourceFunction<OUT> {
-
- private static final long serialVersionUID = 1L;
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java
deleted file mode 100644
index d6a5b2b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.source;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.net.ConnectException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.SocketException;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.Collector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class SocketTextStreamFunction extends RichSourceFunction<String> {
-
- protected static final Logger LOG = LoggerFactory.getLogger(SocketTextStreamFunction.class);
-
- private static final long serialVersionUID = 1L;
-
- private String hostname;
- private int port;
- private char delimiter;
- private long maxRetry;
- private boolean retryForever;
- private Socket socket;
- private static final int CONNECTION_TIMEOUT_TIME = 0;
- private static final int CONNECTION_RETRY_SLEEP = 1000;
-
- private volatile boolean isRunning = false;
-
- public SocketTextStreamFunction(String hostname, int port, char delimiter, long maxRetry) {
- this.hostname = hostname;
- this.port = port;
- this.delimiter = delimiter;
- this.maxRetry = maxRetry;
- this.retryForever = maxRetry < 0;
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- socket = new Socket();
- socket.connect(new InetSocketAddress(hostname, port), CONNECTION_TIMEOUT_TIME);
- }
-
- @Override
- public void run(Collector<String> collector) throws Exception {
- streamFromSocket(collector, socket);
- }
-
- public void streamFromSocket(Collector<String> collector, Socket socket) throws Exception {
- isRunning = true;
- try {
- StringBuffer buffer = new StringBuffer();
- BufferedReader reader = new BufferedReader(new InputStreamReader(
- socket.getInputStream()));
-
- while (isRunning) {
- int data;
- try {
- data = reader.read();
- } catch (SocketException e) {
- if (!isRunning) {
- break;
- } else {
- throw e;
- }
- }
-
- if (data == -1) {
- socket.close();
- long retry = 0;
- boolean success = false;
- while (retry < maxRetry && !success) {
- if (!retryForever) {
- retry++;
- }
- LOG.warn("Lost connection to server socket. Retrying in "
- + (CONNECTION_RETRY_SLEEP / 1000) + " seconds...");
- try {
- socket = new Socket();
- socket.connect(new InetSocketAddress(hostname, port),
- CONNECTION_TIMEOUT_TIME);
- success = true;
- } catch (ConnectException ce) {
- Thread.sleep(CONNECTION_RETRY_SLEEP);
- }
- }
-
- if (success) {
- LOG.info("Server socket is reconnected.");
- } else {
- LOG.error("Could not reconnect to server socket.");
- break;
- }
- reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
- continue;
- }
-
- if (data == delimiter) {
- collector.collect(buffer.toString());
- buffer = new StringBuffer();
- } else if (data != '\r') { // ignore carriage return
- buffer.append((char) data);
- }
- }
-
- if (buffer.length() > 0) {
- collector.collect(buffer.toString());
- }
- } finally {
- socket.close();
- }
- }
-
- @Override
- public void cancel() {
- isRunning = false;
- if (socket != null && !socket.isClosed()) {
- try {
- socket.close();
- } catch (IOException e) {
- if (LOG.isErrorEnabled()) {
- LOG.error("Could not close open socket");
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
deleted file mode 100644
index 0a423cc..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.source;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.util.Collector;
-
-/**
- * Interface for a stream data source.
- *
- * <p>Sources implementing this specific interface are executed with
- * parallelism 1. To execute your sources in parallel
- * see {@link ParallelSourceFunction}.</p>
- *
- * @param <OUT> The type of the records produced by this source.
- */
-public interface SourceFunction<OUT> extends Function, Serializable {
-
- /**
- * Main work method of the source. This function is invoked at the beginning of the
- * source's life and is expected to produce its data py "pushing" the records into
- * the given collector.
- *
- * @param collector The collector that forwards records to the source's consumers.
- *
- * @throws Exception Throwing any type of exception will cause the source to be considered
- * failed. When fault tolerance is enabled, recovery will be triggered,
- * which may create a new instance of this source.
- */
- public void run(Collector<OUT> collector) throws Exception;
-
- /**
- * This method signals the source function to cancel its operation
- * The method is called by the framework if the task is to be aborted prematurely.
- * This happens when the user cancels the job, or when the task is canceled as
- * part of a program failure and cleanup.
- */
- public void cancel();
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/RichWindowMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/RichWindowMapFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/RichWindowMapFunction.java
new file mode 100644
index 0000000..ff045a5
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/RichWindowMapFunction.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.streaming.api.datastream.WindowedDataStream;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
+import org.apache.flink.util.Collector;
+
+/**
+ * Abstract class for defining rich mapWindow transformation to be applied on
+ * {@link WindowedDataStream}s. The mapWindow function will be called on each
+ * {@link StreamWindow}.</p> In addition the user can access the functionality
+ * provided by the {@link RichFunction} interface.
+ */
+public abstract class RichWindowMapFunction<IN, OUT> extends AbstractRichFunction implements
+ WindowMapFunction<IN, OUT> {
+
+ private static final long serialVersionUID = 9052714915997374185L;
+
+ @Override
+ public abstract void mapWindow(Iterable<IN> values, Collector<OUT> out) throws Exception;
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/WindowMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/WindowMapFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/WindowMapFunction.java
new file mode 100644
index 0000000..ececb29
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/WindowMapFunction.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import java.io.Serializable;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.streaming.api.datastream.WindowedDataStream;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
+import org.apache.flink.util.Collector;
+
+/**
+ * Interface for defining mapWindow transformation to be applied on
+ * {@link WindowedDataStream}s. The mapWindow function will be called on each
+ * {@link StreamWindow}.
+ */
+public interface WindowMapFunction<T, O> extends Function, Serializable {
+
+ void mapWindow(Iterable<T> values, Collector<O> out) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/AggregationFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/AggregationFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/AggregationFunction.java
new file mode 100644
index 0000000..23cca90
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/AggregationFunction.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.aggregation;
+
+import org.apache.flink.api.common.functions.RichReduceFunction;
+
+public abstract class AggregationFunction<T> extends RichReduceFunction<T> {
+ private static final long serialVersionUID = 1L;
+
+ public int position;
+
+ public AggregationFunction(int pos) {
+ this.position = pos;
+ }
+
+ public static enum AggregationType {
+ SUM, MIN, MAX, MINBY, MAXBY,
+ }
+
+}