You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:03:54 UTC
[38/51] [partial] flink git commit: [FLINK-2877] Move Streaming API
out of Staging package
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/ParallelSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/ParallelSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/ParallelSourceFunction.java
deleted file mode 100644
index e9a739f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/ParallelSourceFunction.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.source;
-
-/**
- * A stream data source that is executed in parallel. Upon execution, the runtime will
- * execute as many parallel instances of this function function as configured parallelism
- * of the source.
- *
- * <p>This interface acts only as a marker to tell the system that this source may
- * be executed in parallel. When different parallel instances are required to perform
- * different tasks, use the {@link RichParallelSourceFunction} to get access to the runtime
- * context, which reveals information like the number of parallel tasks, and which parallel
- * task the current instance is.
- *
- * @param <OUT> The type of the records produced by this source.
- */
-public interface ParallelSourceFunction<OUT> extends SourceFunction<OUT> {
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/RichEventTimeSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/RichEventTimeSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/RichEventTimeSourceFunction.java
deleted file mode 100644
index 6e0086d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/RichEventTimeSourceFunction.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.source;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-
-/**
- * Base class for implementing a parallel event-time data source that has access to context information
- * (via {@link #getRuntimeContext()}) and additional life-cycle methods
- * ({@link #open(org.apache.flink.configuration.Configuration)} and {@link #close()}.
- *
- * <p>
- * This class is useful when implementing parallel sources where different parallel subtasks
- * need to perform different work. Typical patterns for that are:
- * <ul>
- * <li>Use {@link #getRuntimeContext()} to obtain the runtime context.</li>
- * <li>Use {@link org.apache.flink.api.common.functions.RuntimeContext#getNumberOfParallelSubtasks()}
- * to determine the current parallelism. It is strongly encouraged to use this method, rather than
- * hard-wiring the parallelism, because the configured parallelism may change depending on
- * program configuration. The parallelism may also change after recovering failures, when fewer than
- * desired parallel worker as available.</li>
- * <li>Use {@link org.apache.flink.api.common.functions.RuntimeContext#getIndexOfThisSubtask()} to
- * determine which subtask the current instance of the function executes.</li>
- * </ul>
- *
- *
- * @param <OUT> The type of the records produced by this source.
- */
-public abstract class RichEventTimeSourceFunction<OUT> extends AbstractRichFunction implements EventTimeSourceFunction<OUT> {
-
- private static final long serialVersionUID = 1L;
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/RichParallelSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/RichParallelSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/RichParallelSourceFunction.java
deleted file mode 100644
index 7cbf674..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/RichParallelSourceFunction.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.source;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-
-/**
- * Base class for implementing a parallel data source. Upon execution, the runtime will
- * execute as many parallel instances of this function function as configured parallelism
- * of the source.
- *
- * <p>The data source has access to context information (such as the number of parallel
- * instances of the source, and which parallel instance the current instance is)
- * via {@link #getRuntimeContext()}. It also provides additional life-cycle methods
- * ({@link #open(org.apache.flink.configuration.Configuration)} and {@link #close()}.</p>
- *
- * @param <OUT> The type of the records produced by this source.
- */
-public abstract class RichParallelSourceFunction<OUT> extends AbstractRichFunction
- implements ParallelSourceFunction<OUT> {
-
- private static final long serialVersionUID = 1L;
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/RichSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/RichSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/RichSourceFunction.java
deleted file mode 100644
index dd08b2a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/RichSourceFunction.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.source;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-
-/**
- * Base class for implementing a parallel data source that has access to context information
- * (via {@link #getRuntimeContext()}) and additional life-cycle methods
- * ({@link #open(org.apache.flink.configuration.Configuration)} and {@link #close()}.
- *
- * <p>This class is useful when implementing parallel sources where different parallel subtasks
- * need to perform different work. Typical patterns for that are:
- * <ul>
- * <li>Use {@link #getRuntimeContext()} to obtain the runtime context.</li>
- * <li>Use {@link org.apache.flink.api.common.functions.RuntimeContext#getNumberOfParallelSubtasks()}
- * to determine the current parallelism. It is strongly encouraged to use this method, rather than
- * hard-wiring the parallelism, because the configured parallelism may change depending on
- * program configuration. The parallelism may also change after recovering failures, when fewer than
- * desired parallel worker as available.</li>
- * <li>Use {@link org.apache.flink.api.common.functions.RuntimeContext#getIndexOfThisSubtask()} to
- * determine which subtask the current instance of the function executes.</li>
- * </ul>
- * </p>
- *
- * @param <OUT> The type of the records produced by this source.
- */
-public abstract class RichSourceFunction<OUT> extends AbstractRichFunction implements SourceFunction<OUT> {
-
- private static final long serialVersionUID = 1L;
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
deleted file mode 100644
index 9310b71..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.source;
-
-import org.apache.flink.runtime.util.IOUtils;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.InputStreamReader;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * A source function that reads strings from a socket. The source will read bytes from the socket stream
- * and convert them to characters, each byte individually. When the delimiter character is received,
- * the function will output the current string, and begin a new string.
- * <p>
- * The function strips trailing <i>carriage return</i> characters (\r) when the delimiter is the
- * newline character (\n).
- * <p>
- * The function can be set to reconnect to the server socket in case that the stream is closed on the server side.
- */
-public class SocketTextStreamFunction implements SourceFunction<String> {
-
- private static final long serialVersionUID = 1L;
-
- private static final Logger LOG = LoggerFactory.getLogger(SocketTextStreamFunction.class);
-
- /** Default delay between successive connection attempts */
- private static final int DEFAULT_CONNECTION_RETRY_SLEEP = 500;
-
- /** Default connection timeout when connecting to the server socket (infinite) */
- private static final int CONNECTION_TIMEOUT_TIME = 0;
-
-
- private final String hostname;
- private final int port;
- private final char delimiter;
- private final long maxNumRetries;
- private final long delayBetweenRetries;
-
- private transient Socket currentSocket;
-
- private volatile boolean isRunning = true;
-
-
- public SocketTextStreamFunction(String hostname, int port, char delimiter, long maxNumRetries) {
- this(hostname, port, delimiter, maxNumRetries, DEFAULT_CONNECTION_RETRY_SLEEP);
- }
-
- public SocketTextStreamFunction(String hostname, int port, char delimiter, long maxNumRetries, long delayBetweenRetries) {
- checkArgument(port > 0 && port < 65536, "port is out of range");
- checkArgument(maxNumRetries >= -1, "maxNumRetries must be zero or larger (num retries), or -1 (infinite retries)");
- checkArgument(delayBetweenRetries >= 0, "delayBetweenRetries must be zero or positive");
-
- this.hostname = checkNotNull(hostname, "hostname must not be null");
- this.port = port;
- this.delimiter = delimiter;
- this.maxNumRetries = maxNumRetries;
- this.delayBetweenRetries = delayBetweenRetries;
- }
-
- @Override
- public void run(SourceContext<String> ctx) throws Exception {
- final StringBuilder buffer = new StringBuilder();
- long attempt = 0;
-
- while (isRunning) {
-
- try (Socket socket = new Socket()) {
- currentSocket = socket;
-
- LOG.info("Connecting to server socket " + hostname + ':' + port);
- socket.connect(new InetSocketAddress(hostname, port), CONNECTION_TIMEOUT_TIME);
- BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
-
- int data;
- while (isRunning && (data = reader.read()) != -1) {
- // check if the string is complete
- if (data != delimiter) {
- buffer.append((char) data);
- }
- else {
- // truncate trailing carriage return
- if (delimiter == '\n' && buffer.length() > 0 && buffer.charAt(buffer.length() - 1) == '\r') {
- buffer.setLength(buffer.length() - 1);
- }
- ctx.collect(buffer.toString());
- buffer.setLength(0);
- }
- }
- }
-
- // if we dropped out of this loop due to an EOF, sleep and retry
- if (isRunning) {
- attempt++;
- if (maxNumRetries == -1 || attempt < maxNumRetries) {
- LOG.warn("Lost connection to server socket. Retrying in " + delayBetweenRetries + " msecs...");
- Thread.sleep(delayBetweenRetries);
- }
- else {
- // this should probably be here, but some examples expect simple exists of the stream source
- // throw new EOFException("Reached end of stream and reconnects are not enabled.");
- break;
- }
- }
- }
-
- // collect trailing data
- if (buffer.length() > 0) {
- ctx.collect(buffer.toString());
- }
- }
-
- @Override
- public void cancel() {
- isRunning = false;
-
- // we need to close the socket as well, because the Thread.interrupt() function will
- // not wake the thread in the socketStream.read() method when blocked.
- Socket theSocket = this.currentSocket;
- if (theSocket != null) {
- IOUtils.closeSocket(theSocket);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
deleted file mode 100644
index 886d6e7..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.source;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.streaming.api.watermark.Watermark;
-
-import java.io.Serializable;
-
-/**
- * Base interface for all stream data sources in Flink. The contract of a stream source
- * is the following: When the source should start emitting elements the {@link #run} method
- * is called with a {@link org.apache.flink.util.Collector} that can be used for emitting elements.
- * The run method can run for as long as necessary. The source must, however, react to an
- * invocation of {@link #cancel} by breaking out of its main loop.
- *
- * <p>
- * <b>Note about checkpointed sources</b> <br>
- *
- * Sources that also implement the {@link org.apache.flink.streaming.api.checkpoint.Checkpointed}
- * interface must ensure that state checkpointing, updating of internal state and emission of
- * elements are not done concurrently. This is achieved by using the provided checkpointing lock
- * object to protect update of state and emission of elements in a synchronized block.
- * </p>
- *
- * <p>
- * This is the basic pattern one should follow when implementing a (checkpointed) source:
- * </p>
- *
- * {@code
- * public class ExampleSource<T> implements SourceFunction<T>, Checkpointed<Long> {
- * private long count = 0L;
- * private volatile boolean isRunning = true;
- *
- * @Override
- * public void run(SourceContext<T> ctx) {
- * while (isRunning && count < 1000) {
- * synchronized (ctx.getCheckpointLock()) {
- * ctx.collect(count);
- * count++;
- * }
- * }
- * }
- *
- * @Override
- * public void cancel() {
- * isRunning = false;
- * }
- *
- * @Override
- * public Long snapshotState(long checkpointId, long checkpointTimestamp) { return count; }
- *
- * @Override
- * public void restoreState(Long state) { this.count = state; }
- * }
- * </pre>
- *
- *
- * <p>
- * <b>Note about element timestamps and watermarks:</b> <br>
- * Sources must only manually emit watermarks when they implement
- * {@link EventTimeSourceFunction }.
- * Otherwise, elements automatically get the current timestamp assigned at ingress
- * and the system automatically emits watermarks.
- *
- * @param <T> The type of the elements produced by this source.
- */
-public interface SourceFunction<T> extends Function, Serializable {
-
- /**
- * Starts the source. You can use the {@link org.apache.flink.util.Collector} parameter to emit
- * elements. Sources that implement
- * {@link org.apache.flink.streaming.api.checkpoint.Checkpointed} must lock on the
- * checkpoint lock (using a synchronized block) before updating internal state and/or emitting
- * elements. Also, the update of state and emission of elements must happen in the same
- * synchronized block.
- *
- * @param ctx The context for interaction with the outside world.
- */
- void run(SourceContext<T> ctx) throws Exception;
-
- /**
- * Cancels the source. Most sources will have a while loop inside the
- * {@link #run} method. You need to ensure that the source will break out of this loop. This
- * can be achieved by having a volatile field "isRunning" that is checked in the loop and that
- * is set to false in this method.
- */
- void cancel();
-
- /**
- * Interface that source functions use to communicate with the outside world. Normally
- * sources would just emit elements in a loop using {@link #collect}. If the source is a
- * {@link org.apache.flink.streaming.api.checkpoint.Checkpointed} source it must retrieve
- * the checkpoint lock object and use it to protect state updates and element emission as
- * described in {@link org.apache.flink.streaming.api.functions.source.SourceFunction}.
- *
- * @param <T> The type of the elements produced by the source.
- */
- public static interface SourceContext<T> {
-
- /**
- * Emits one element from the source. The result of {@link System#currentTimeMillis()} is set as
- * the timestamp of the emitted element.
- *
- * @param element The element to emit
- */
- void collect(T element);
-
- /**
- * Emits one element from the source with the given timestamp.
- *
- * @param element The element to emit
- * @param timestamp The timestamp in milliseconds
- */
- public void collectWithTimestamp(T element, long timestamp);
-
- /**
- * Emits the given {@link org.apache.flink.streaming.api.watermark.Watermark}.
- *
- * <p>
- * <b>Important:</b>
- * Sources must only manually emit watermarks when they implement
- * {@link EventTimeSourceFunction}.
- * Otherwise, elements automatically get the current timestamp assigned at ingress
- * and the system automatically emits watermarks.
- *
- * @param mark The {@link Watermark} to emit
- */
- void emitWatermark(Watermark mark);
-
-
- /**
- * Returns the checkpoint lock. Please refer to the explanation about checkpointed sources
- * in {@link org.apache.flink.streaming.api.functions.source.SourceFunction}.
- *
- * @return The object to use as the lock.
- */
- Object getCheckpointLock();
-
- /**
- * This must be called when closing the source operator to allow the {@link SourceContext}
- * to clean up internal state.
- */
- void close();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java
deleted file mode 100644
index 14badf1..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.functions.source;
-
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-
-/**
- * A stateful streaming source that emits each number from a given interval exactly once,
- * possibly in parallel.
- */
-public class StatefulSequenceSource extends RichParallelSourceFunction<Long> implements Checkpointed<Long> {
-
- private static final long serialVersionUID = 1L;
-
- private final long start;
- private final long end;
-
- private long collected;
-
- private volatile boolean isRunning = true;
-
- /**
- * Creates a source that emits all numbers from the given interval exactly once.
- *
- * @param start Start of the range of numbers to emit.
- * @param end End of the range of numbers to emit.
- */
- public StatefulSequenceSource(long start, long end) {
- this.start = start;
- this.end = end;
- }
-
- @Override
- public void run(SourceContext<Long> ctx) throws Exception {
- final Object checkpointLock = ctx.getCheckpointLock();
-
- RuntimeContext context = getRuntimeContext();
-
- final long stepSize = context.getNumberOfParallelSubtasks();
- final long congruence = start + context.getIndexOfThisSubtask();
-
- final long toCollect =
- ((end - start + 1) % stepSize > (congruence - start)) ?
- ((end - start + 1) / stepSize + 1) :
- ((end - start + 1) / stepSize);
-
-
- while (isRunning && collected < toCollect) {
- synchronized (checkpointLock) {
- ctx.collect(collected * stepSize + congruence);
- collected++;
- }
- }
- }
-
- @Override
- public void cancel() {
- isRunning = false;
- }
-
- @Override
- public Long snapshotState(long checkpointId, long checkpointTimestamp) {
- return collected;
- }
-
- @Override
- public void restoreState(Long state) {
- collected = state;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/AllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/AllWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/AllWindowFunction.java
deleted file mode 100644
index 1d54436..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/AllWindowFunction.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.windowing;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.util.Collector;
-
-import java.io.Serializable;
-
-/**
- * Base interface for functions that are evaluated over non-keyed windows.
- *
- * @param <IN> The type of the input value.
- * @param <OUT> The type of the output value.
- */
-public interface AllWindowFunction<IN, OUT, W extends Window> extends Function, Serializable {
-
- /**
- * Evaluates the window and outputs none or several elements.
- *
- * @param window The window that is being evaluated.
- * @param values The elements in the window being evaluated.
- * @param out A collector for emitting elements.
- *
- * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
- */
- void apply(W window, Iterable<IN> values, Collector<OUT> out) throws Exception;
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldAllWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldAllWindowFunction.java
deleted file mode 100644
index 69f24fe..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldAllWindowFunction.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.functions.windowing;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.operators.translation.WrappingFunction;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
-import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
-import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.util.Collector;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-public class FoldAllWindowFunction<W extends Window, T, R>
- extends WrappingFunction<FoldFunction<T, R>>
- implements AllWindowFunction<T, R, W>, OutputTypeConfigurable<R> {
- private static final long serialVersionUID = 1L;
-
- private byte[] serializedInitialValue;
- private TypeSerializer<R> outSerializer;
- private transient R initialValue;
-
- public FoldAllWindowFunction(R initialValue, FoldFunction<T, R> reduceFunction) {
- super(reduceFunction);
- this.initialValue = initialValue;
- }
-
- @Override
- public void open(Configuration configuration) throws Exception {
- super.open(configuration);
-
- if (serializedInitialValue == null) {
- throw new RuntimeException("No initial value was serialized for the fold " +
- "window function. Probably the setOutputType method was not called.");
- }
-
- ByteArrayInputStream bais = new ByteArrayInputStream(serializedInitialValue);
- InputViewDataInputStreamWrapper in = new InputViewDataInputStreamWrapper(
- new DataInputStream(bais)
- );
- initialValue = outSerializer.deserialize(in);
- }
-
- @Override
- public void apply(W window, Iterable<T> values, Collector<R> out) throws Exception {
- R result = outSerializer.copy(initialValue);
-
- for (T val: values) {
- result = wrappedFunction.fold(result, val);
- }
-
- out.collect(result);
- }
-
- @Override
- public void setOutputType(TypeInformation<R> outTypeInfo, ExecutionConfig executionConfig) {
- outSerializer = outTypeInfo.createSerializer(executionConfig);
-
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- OutputViewDataOutputStreamWrapper out = new OutputViewDataOutputStreamWrapper(
- new DataOutputStream(baos)
- );
-
- try {
- outSerializer.serialize(initialValue, out);
- } catch (IOException ioe) {
- throw new RuntimeException("Unable to serialize initial value of type " +
- initialValue.getClass().getSimpleName() + " of fold window function.", ioe);
- }
-
- serializedInitialValue = baos.toByteArray();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldWindowFunction.java
deleted file mode 100644
index 04d2ac7..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldWindowFunction.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.functions.windowing;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.operators.translation.WrappingFunction;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
-import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
-import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.util.Collector;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-public class FoldWindowFunction<K, W extends Window, T, R>
- extends WrappingFunction<FoldFunction<T, R>>
- implements WindowFunction<T, R, K, W>, OutputTypeConfigurable<R> {
- private static final long serialVersionUID = 1L;
-
- private byte[] serializedInitialValue;
- private TypeSerializer<R> outSerializer;
- private transient R initialValue;
-
- public FoldWindowFunction(R initialValue, FoldFunction<T, R> reduceFunction) {
- super(reduceFunction);
- this.initialValue = initialValue;
- }
-
- @Override
- public void open(Configuration configuration) throws Exception {
- super.open(configuration);
-
- if (serializedInitialValue == null) {
- throw new RuntimeException("No initial value was serialized for the fold " +
- "window function. Probably the setOutputType method was not called.");
- }
-
- ByteArrayInputStream bais = new ByteArrayInputStream(serializedInitialValue);
- InputViewDataInputStreamWrapper in = new InputViewDataInputStreamWrapper(
- new DataInputStream(bais)
- );
- initialValue = outSerializer.deserialize(in);
- }
-
- @Override
- public void apply(K k, W window, Iterable<T> values, Collector<R> out) throws Exception {
- R result = outSerializer.copy(initialValue);
-
- for (T val: values) {
- result = wrappedFunction.fold(result, val);
- }
-
- out.collect(result);
- }
-
- @Override
- public void setOutputType(TypeInformation<R> outTypeInfo, ExecutionConfig executionConfig) {
- outSerializer = outTypeInfo.createSerializer(executionConfig);
-
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- OutputViewDataOutputStreamWrapper out = new OutputViewDataOutputStreamWrapper(
- new DataOutputStream(baos)
- );
-
- try {
- outSerializer.serialize(initialValue, out);
- } catch (IOException ioe) {
- throw new RuntimeException("Unable to serialize initial value of type " +
- initialValue.getClass().getSimpleName() + " of fold window function.", ioe);
- }
-
- serializedInitialValue = baos.toByteArray();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceAllWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceAllWindowFunction.java
deleted file mode 100644
index 24855a5..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceAllWindowFunction.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.functions.windowing;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.functions.util.FunctionUtils;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.util.Collector;
-
-public class ReduceAllWindowFunction<W extends Window, T> extends RichAllWindowFunction<T, T, W> {
- private static final long serialVersionUID = 1L;
-
- private final ReduceFunction<T> reduceFunction;
-
- public ReduceAllWindowFunction(ReduceFunction<T> reduceFunction) {
- this.reduceFunction = reduceFunction;
- }
-
- @Override
- public void setRuntimeContext(RuntimeContext ctx) {
- super.setRuntimeContext(ctx);
- FunctionUtils.setFunctionRuntimeContext(reduceFunction, ctx);
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- FunctionUtils.openFunction(reduceFunction, parameters);
- }
-
- @Override
- public void close() throws Exception {
- super.close();
- FunctionUtils.closeFunction(reduceFunction);
- }
-
- @Override
- public void apply(W window, Iterable<T> values, Collector<T> out) throws Exception {
- T result = null;
-
- for (T v: values) {
- if (result == null) {
- result = v;
- } else {
- result = reduceFunction.reduce(result, v);
- }
- }
-
- if (result != null) {
- out.collect(result);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java
deleted file mode 100644
index edd8a34..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.functions.windowing;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.operators.translation.WrappingFunction;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.util.Collector;
-
-public class ReduceWindowFunction<K, W extends Window, T>
- extends WrappingFunction<ReduceFunction<T>>
- implements WindowFunction<T, T, K, W> {
- private static final long serialVersionUID = 1L;
-
- public ReduceWindowFunction(ReduceFunction<T> reduceFunction) {
- super(reduceFunction);
- }
-
- @Override
- public void apply(K k, W window, Iterable<T> values, Collector<T> out) throws Exception {
- T result = null;
-
- for (T v: values) {
- if (result == null) {
- result = v;
- } else {
- result = wrappedFunction.reduce(result, v);
- }
- }
-
- if (result != null) {
- out.collect(result);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunctionWithWindow.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunctionWithWindow.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunctionWithWindow.java
deleted file mode 100644
index 6a472b1..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunctionWithWindow.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.functions.windowing;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.functions.util.FunctionUtils;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.util.Collector;
-
-public class ReduceWindowFunctionWithWindow<K, W extends Window, T> extends RichWindowFunction<T, Tuple2<W, T>, K, W> {
- private static final long serialVersionUID = 1L;
-
- private final ReduceFunction<T> reduceFunction;
-
- public ReduceWindowFunctionWithWindow(ReduceFunction<T> reduceFunction) {
- this.reduceFunction = reduceFunction;
- }
-
- @Override
- public void setRuntimeContext(RuntimeContext ctx) {
- super.setRuntimeContext(ctx);
- FunctionUtils.setFunctionRuntimeContext(reduceFunction, ctx);
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- FunctionUtils.openFunction(reduceFunction, parameters);
- }
-
- @Override
- public void close() throws Exception {
- super.close();
- FunctionUtils.closeFunction(reduceFunction);
- }
-
- @Override
- public void apply(K k, W window, Iterable<T> values, Collector<Tuple2<W, T>> out) throws Exception {
- T result = null;
-
- for (T v: values) {
- if (result == null) {
- result = v;
- } else {
- result = reduceFunction.reduce(result, v);
- }
- }
-
- if (result != null) {
- out.collect(Tuple2.of(window, result));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichAllWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichAllWindowFunction.java
deleted file mode 100644
index d78e2c3..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichAllWindowFunction.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.functions.windowing;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-
-public abstract class RichAllWindowFunction<IN, OUT, W extends Window> extends AbstractRichFunction implements AllWindowFunction<IN, OUT, W> {
- private static final long serialVersionUID = 1L;
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichWindowFunction.java
deleted file mode 100644
index 0d40bbd..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichWindowFunction.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.functions.windowing;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-
-public abstract class RichWindowFunction<IN, OUT, KEY, W extends Window> extends AbstractRichFunction implements WindowFunction<IN, OUT, KEY, W> {
- private static final long serialVersionUID = 1L;
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/WindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/WindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/WindowFunction.java
deleted file mode 100644
index eda12c0..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/WindowFunction.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.windowing;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.util.Collector;
-
-import java.io.Serializable;
-
-/**
- * Base interface for functions that are evaluated over keyed (grouped) windows.
- *
- * @param <IN> The type of the input value.
- * @param <OUT> The type of the output value.
- * @param <KEY> The type of the key.
- */
-public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {
-
- /**
- * Evaluates the window and outputs none or several elements.
- *
- * @param key The key for which this window is evaluated.
- * @param window The window that is being evaluated.
- * @param values The elements in the window being evaluated.
- * @param out A collector for emitting elements.
- *
- * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
- */
- void apply(KEY key, W window, Iterable<IN> values, Collector<OUT> out) throws Exception;
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/CosineDistance.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/CosineDistance.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/CosineDistance.java
deleted file mode 100644
index 86a12e2..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/CosineDistance.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.windowing.delta;
-
-import org.apache.flink.streaming.api.functions.windowing.delta.extractor.Extractor;
-
-/**
- * This delta function calculates the cosine distance between two given vectors.
- * The cosine distance is defined as: cosineDistance=1-cosineSimilarity
- *
- * Cosine similarity: http://en.wikipedia.org/wiki/Cosine_similarity
- *
- * @param <DATA>
- * The input data type. This delta function works with a double[],
- * but can extract/convert to it from any other given object in case
- * the respective extractor has been set. See
- * {@link ExtractionAwareDeltaFunction} for more information.
- */
-public class CosineDistance<DATA> extends ExtractionAwareDeltaFunction<DATA, double[]> {
-
- /**
- * auto-generated id
- */
- private static final long serialVersionUID = -1217813582965151599L;
-
- public CosineDistance() {
- super(null);
- }
-
- public CosineDistance(Extractor<DATA, double[]> converter) {
- super(converter);
- }
-
- @Override
- public double getNestedDelta(double[] oldDataPoint, double[] newDataPoint) {
- if (isNullvector(oldDataPoint, newDataPoint)) {
- return 0;
- }
-
- if (oldDataPoint.length != newDataPoint.length) {
- throw new IllegalArgumentException(
- "The size of two input arrays are not same, can not compute cosine distance");
- }
-
- double sum1 = 0;
- double sum2 = 0;
- for (int i = 0; i < oldDataPoint.length; i++) {
- sum1 += oldDataPoint[i] * oldDataPoint[i];
- sum2 += newDataPoint[i] * newDataPoint[i];
- }
- sum1 = Math.sqrt(sum1);
- sum2 = Math.sqrt(sum2);
-
- return 1d - (dotProduct(oldDataPoint, newDataPoint) / (sum1 * sum2));
- }
-
- private double dotProduct(double[] a, double[] b) {
- double result = 0;
- for (int i = 0; i < a.length; i++) {
- result += a[i] * b[i];
- }
- return result;
- }
-
- private boolean isNullvector(double[]... vectors) {
- outer: for (double[] v : vectors) {
- for (double field : v) {
- if (field != 0) {
- continue outer;
- }
- }
- // This position is only reached in case all fields are 0.
- return true;
- }
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/DeltaFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/DeltaFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/DeltaFunction.java
deleted file mode 100644
index 0ce2bf9..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/DeltaFunction.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.windowing.delta;
-
-import java.io.Serializable;
-
-/**
- * This interface allows the implementation of a function which calculates the
- * delta between two data points. Delta functions might be used in delta
- * policies and allow flexible adaptive windowing based on the arriving data
- * points.
- *
- * @param <DATA>
- * The type of input data which can be compared using this function.
- */
-public interface DeltaFunction<DATA> extends Serializable {
-
- /**
- * Calculates the delta between two given data points.
- *
- * @param oldDataPoint
- * the old data point.
- * @param newDataPoint
- * the new data point.
- * @return the delta between the two given points.
- */
- public double getDelta(DATA oldDataPoint, DATA newDataPoint);
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/EuclideanDistance.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/EuclideanDistance.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/EuclideanDistance.java
deleted file mode 100644
index 23efbf2..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/EuclideanDistance.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.windowing.delta;
-
-import org.apache.flink.streaming.api.functions.windowing.delta.extractor.Extractor;
-
-/**
- * This delta function calculates the euclidean distance between two given
- * points.
- *
- * Euclidean distance: http://en.wikipedia.org/wiki/Euclidean_distance
- *
- * @param <DATA>
- * The input data type. This delta function works with a double[],
- * but can extract/convert to it from any other given object in case
- * the respective extractor has been set. See
- * {@link ExtractionAwareDeltaFunction} for more information.
- */
-public class EuclideanDistance<DATA> extends ExtractionAwareDeltaFunction<DATA, double[]> {
-
- public EuclideanDistance() {
- super(null);
- }
-
- public EuclideanDistance(Extractor<DATA, double[]> converter) {
- super(converter);
- }
-
- /**
- * auto-generated version id
- */
- private static final long serialVersionUID = 3119432599634512359L;
-
- @Override
- public double getNestedDelta(double[] oldDataPoint, double[] newDataPoint) {
- double result = 0;
- for (int i = 0; i < oldDataPoint.length; i++) {
- result += (oldDataPoint[i] - newDataPoint[i]) * (oldDataPoint[i] - newDataPoint[i]);
- }
- return Math.sqrt(result);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/ExtractionAwareDeltaFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/ExtractionAwareDeltaFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/ExtractionAwareDeltaFunction.java
deleted file mode 100644
index 7a4e01a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/ExtractionAwareDeltaFunction.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.windowing.delta;
-
-import org.apache.flink.streaming.api.functions.windowing.delta.extractor.Extractor;
-
-/**
- * Extend this abstract class to implement a delta function which is aware of
- * extracting the data on which the delta is calculated from a more complex data
- * structure. For example in case you want to be able to run a delta only on one
- * field of a Tuple type or only on some fields from an array.
- *
- * @param <DATA>
- * The input data type. The input of this type will be passed to the
- * extractor which will transform into a TO-object. The delta
- * function then runs on this TO-object.
- * @param <TO>
- * The type on which the delta function runs. (The type of the delta
- * function)
- */
-public abstract class ExtractionAwareDeltaFunction<DATA, TO> implements DeltaFunction<DATA> {
-
- /**
- * Generated Version ID
- */
- private static final long serialVersionUID = 6927486219702689554L;
- private Extractor<DATA, TO> converter;
-
- public ExtractionAwareDeltaFunction(Extractor<DATA, TO> converter) {
- this.converter = converter;
- }
-
- /**
- * This method takes the two data point and runs the set extractor on it.
- * The delta function implemented at {@link #getNestedDelta} is then called
- * with the extracted data. In case no extractor is set the input data gets
- * passes to {@link #getNestedDelta} as-is. The return value is just
- * forwarded from {@link #getNestedDelta}.
- *
- * @param oldDataPoint
- * the older data point as raw data (before extraction).
- * @param newDataPoint
- * the new data point as raw data (before extraction).
- * @return the delta between the two points.
- */
- @SuppressWarnings("unchecked")
- @Override
- public double getDelta(DATA oldDataPoint, DATA newDataPoint) {
- if (converter == null) {
- // In case no conversion/extraction is required, we can cast DATA to
- // TO
- // => Therefore, "unchecked" warning is suppressed for this method.
- return getNestedDelta((TO) oldDataPoint, (TO) newDataPoint);
- } else {
- return getNestedDelta(converter.extract(oldDataPoint), converter.extract(newDataPoint));
- }
-
- }
-
- /**
- * This method is exactly the same as
- * {@link DeltaFunction#getDelta(Object, Object)} except that it gets the
- * result of the previously done extractions as input. Therefore, this
- * method only does the actual calculation of the delta but no data
- * extraction or conversion.
- *
- * @param oldDataPoint
- * the older data point.
- * @param newDataPoint
- * the new data point.
- * @return the delta between the two points.
- */
- public abstract double getNestedDelta(TO oldDataPoint, TO newDataPoint);
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ArrayFromTuple.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ArrayFromTuple.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ArrayFromTuple.java
deleted file mode 100644
index baceba4..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ArrayFromTuple.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.windowing.delta.extractor;
-
-import org.apache.flink.api.java.tuple.Tuple;
-
-/**
- * Converts a Tuple to an Object-Array. The field which should be included in
- * the array can selected and reordered as needed.
- */
-public class ArrayFromTuple implements Extractor<Tuple, Object[]> {
-
- /**
- * Auto generated version id
- */
- private static final long serialVersionUID = -6076121226427616818L;
- int[] order = null;
-
- /**
- * Using this constructor the extractor will convert the whole tuple (all
- * fields in the original order) to an array.
- */
- public ArrayFromTuple() {
- // noting to do
- }
-
- /**
- * Using this constructor the extractor will combine the fields as specified
- * in the indexes parameter in an object array.
- *
- * @param indexes
- * the field ids (enumerated from 0)
- */
- public ArrayFromTuple(int... indexes) {
- this.order = indexes;
- }
-
- @Override
- public Object[] extract(Tuple in) {
- Object[] output;
-
- if (order == null) {
- // copy the whole tuple
- output = new Object[in.getArity()];
- for (int i = 0; i < in.getArity(); i++) {
- output[i] = in.getField(i);
- }
- } else {
- // copy user specified order
- output = new Object[order.length];
- for (int i = 0; i < order.length; i++) {
- output[i] = in.getField(order[i]);
- }
- }
-
- return output;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ConcatenatedExtract.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ConcatenatedExtract.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ConcatenatedExtract.java
deleted file mode 100644
index 89c3a32..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ConcatenatedExtract.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.windowing.delta.extractor;
-
-/**
- * Combines two extractors which will be executed one after each other.
- *
- * @param <FROM>
- * The input type of the first extractor.
- * @param <OVER>
- * The output type of the first and the input type of the second
- * extractor.
- * @param <TO>
- * The output type of the second extractor and the output type of the
- * over all extraction.
- */
-public class ConcatenatedExtract<FROM, OVER, TO> implements Extractor<FROM, TO> {
-
- /**
- * auto-generated id
- */
- private static final long serialVersionUID = -7807197760725651752L;
-
- private Extractor<FROM, OVER> e1;
- private Extractor<OVER, TO> e2;
-
- /**
- * Combines two extractors which will be executed one after each other.
- *
- * @param e1
- * First extractor: This extractor gets applied to the input data
- * first. Its output as then passed as input to the second
- * extractor.
- * @param e2
- * Second extractor: This extractor gets the output of the first
- * extractor as input. Its output is then the result of the over
- * all extraction.
- */
- public ConcatenatedExtract(Extractor<FROM, OVER> e1, Extractor<OVER, TO> e2) {
- this.e1 = e1;
- this.e2 = e2;
- }
-
- @Override
- public TO extract(FROM in) {
- return e2.extract(e1.extract(in));
- }
-
- public <OUT> ConcatenatedExtract<FROM, TO, OUT> add(Extractor<TO, OUT> e3) {
- return new ConcatenatedExtract<FROM, TO, OUT>(this, e3);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/Extractor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/Extractor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/Extractor.java
deleted file mode 100644
index 8cd0014..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/Extractor.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.windowing.delta.extractor;
-
-import java.io.Serializable;
-
-/**
- * Extractors allow to extract/convert one type to another. They are mostly used
- * to extract some fields out of a more complex structure (Tuple/Array) to run
- * further calculation on the extraction result.
- *
- * @param <FROM>
- * The input data type.
- * @param <TO>
- * The output data type.
- */
-public interface Extractor<FROM, TO> extends Serializable {
-
- /**
- * Extracts/Converts the given input to an object of the output type
- *
- * @param in
- * the input data
- * @return the extracted/converted data
- */
- public TO extract(FROM in);
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromArray.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromArray.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromArray.java
deleted file mode 100644
index f9d0a2b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromArray.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.windowing.delta.extractor;
-
-import java.lang.reflect.Array;
-
-/**
- * Extracts a single field out of an array.
- *
- * @param <OUT>
- * The type of the extracted field.
- */
-public class FieldFromArray<OUT> implements Extractor<Object, OUT> {
-
- /**
- * Auto-gernated version id
- */
- private static final long serialVersionUID = -5161386546695574359L;
- private int fieldId = 0;
-
- /**
- * Extracts the first field (id 0) from the array
- */
- public FieldFromArray() {
- // noting to do => will use default 0
- }
-
- /**
- * Extracts the field with the given id from the array.
- *
- * @param fieldId
- * The id of the field which will be extracted from the array.
- */
- public FieldFromArray(int fieldId) {
- this.fieldId = fieldId;
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public OUT extract(Object in) {
- return (OUT) Array.get(in, fieldId);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromTuple.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromTuple.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromTuple.java
deleted file mode 100644
index 627afca..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromTuple.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.windowing.delta.extractor;
-
-import org.apache.flink.api.java.tuple.Tuple;
-
-/**
- * Extracts a single field out of a tuple.
- *
- * @param <OUT>
- * The type of the extracted field.
- */
-public class FieldFromTuple<OUT> implements Extractor<Tuple, OUT> {
-
- /**
- * Auto-gernated version id
- */
- private static final long serialVersionUID = -5161386546695574359L;
- private int fieldId = 0;
-
- /**
- * Extracts the first field (id 0) from the tuple
- */
- public FieldFromTuple() {
- // noting to do => will use default 0
- }
-
- /**
- * Extracts the field with the given id from the tuple.
- *
- * @param fieldId
- * The id of the field which will be extracted from the tuple.
- */
- public FieldFromTuple(int fieldId) {
- this.fieldId = fieldId;
- }
-
- @Override
- public OUT extract(Tuple in) {
- return in.getField(fieldId);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromArray.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromArray.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromArray.java
deleted file mode 100644
index b1c080e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromArray.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.windowing.delta.extractor;
-
-import java.lang.reflect.Array;
-
-/**
- * Extracts multiple fields from an array and puts them into a new array of the
- * specified type.
- *
- * @param <OUT>
- * The type of the output array. If out is set to String, the output
- * of the extractor will be a String[]. If it is set to String[] the
- * output will be String[][].
- */
-public class FieldsFromArray<OUT> implements Extractor<Object, OUT[]> {
-
- /**
- * Auto-generated version id
- */
- private static final long serialVersionUID = 8075055384516397670L;
- private int[] order;
- private Class<OUT> clazz;
-
- /**
- * Extracts multiple fields from an array and puts them in the given order
- * into a new array of the specified type.
- *
- * @param clazz
- * the Class object representing the component type of the new
- * array
- * @param indexes
- * The indexes of the fields to be extracted. Any order is
- * possible, but not more than 255 fields due to limitations in
- * {@link Array#newInstance(Class, int...)}.
- */
- public FieldsFromArray(Class<OUT> clazz, int... indexes) {
- this.order = indexes;
- this.clazz = clazz;
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public OUT[] extract(Object in) {
- OUT[] output = (OUT[]) Array.newInstance(clazz, order.length);
- for (int i = 0; i < order.length; i++) {
- output[i] = (OUT) Array.get(in, this.order[i]);
- }
- return output;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromTuple.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromTuple.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromTuple.java
deleted file mode 100644
index fc7f3ab..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromTuple.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.windowing.delta.extractor;
-
-import org.apache.flink.api.java.tuple.Tuple;
-
-/**
- * Extracts one or more fields of the type Double from a tuple and puts them
- * into a new double[]
- */
-public class FieldsFromTuple implements Extractor<Tuple, double[]> {
-
- /**
- * auto generated version id
- */
- private static final long serialVersionUID = -2554079091050273761L;
- int[] indexes;
-
- /**
- * Extracts one or more fields of the the type Double from a tuple and puts
- * them into a new double[] (in the specified order).
- *
- * @param indexes
- * The indexes of the fields to be extracted.
- */
- public FieldsFromTuple(int... indexes) {
- this.indexes = indexes;
- }
-
- @Override
- public double[] extract(Tuple in) {
- double[] out = new double[indexes.length];
- for (int i = 0; i < indexes.length; i++) {
- out[i] = (Double) in.getField(indexes[i]);
- }
- return out;
- }
-}