You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:04:00 UTC
[44/51] [partial] flink git commit: [FLINK-2877] Move Streaming API
out of Staging package
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/pom.xml b/flink-staging/flink-streaming/flink-streaming-core/pom.xml
deleted file mode 100644
index cebb538..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/pom.xml
+++ /dev/null
@@ -1,116 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-parent</artifactId>
- <version>0.10-SNAPSHOT</version>
- <relativePath>..</relativePath>
- </parent>
-
- <artifactId>flink-streaming-core</artifactId>
- <name>flink-streaming-core</name>
-
- <packaging>jar</packaging>
-
- <dependencies>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-core</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-runtime</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-clients</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-math</artifactId>
- <version>2.2</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-test-utils</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.sling</groupId>
- <artifactId>org.apache.sling.commons.json</artifactId>
- <version>2.0.6</version>
- </dependency>
-
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- <version>${guava.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-math3</artifactId>
- <version>3.5</version>
- </dependency>
-
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-jar-plugin</artifactId>
- <executions>
- <execution>
- <goals>
- <goal>test-jar</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
-
- <!-- disable fork reuse for the streaming project, because of
- incorrect declaration of tests -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <configuration>
- <reuseForks>false</reuseForks>
- </configuration>
- </plugin>
- </plugins>
- </build>
-
-</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/CheckpointingMode.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/CheckpointingMode.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/CheckpointingMode.java
deleted file mode 100644
index db46d00..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/CheckpointingMode.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api;
-
-/**
- * The checkpointing mode defines what consistency guarantees the system gives in the presence of
- * failures.
- *
- * <p>When checkpointing is activated, the data streams are replayed such that lost parts of the
- * processing are repeated. For stateful operations and functions, the checkpointing mode defines
- * whether the system draws checkpoints such that a recovery behaves as if the operators/functions
- * see each record "exactly once" ({@link #EXACTLY_ONCE}), or whether the checkpoints are drawn
- * in a simpler fashion that typically encounteres some duplicates upon recovery
- * ({@link #AT_LEAST_ONCE})</p>
- */
-public enum CheckpointingMode {
-
- /**
- * Sets the checkpointing mode to "exactly once". This mode means that the system will
- * checkpoint the operator and user function state in such a way that, upon recovery,
- * every record will be reflected exactly once in the operator state.
- *
- * <p>For example, if a user function counts the number of elements in a stream,
- * this number will consistently be equal to the number of actual elements in the stream,
- * regardless of failures and recovery.</p>
- *
- * <p>Note that this does not mean that each record flows through the streaming data flow
- * only once. It means that upon recovery, the state of operators/functions is restored such
- * that the resumed data streams pick up exactly at after the last modification to the state.</p>
- *
- * <p>Note that this mode does not guarantee exactly-once behavior in the interaction with
- * external systems (only state in Flink's operators and user functions). The reason for that
- * is that a certain level of "collaboration" is required between two systems to achieve
- * exactly-once guarantees. However, for certain systems, connectors can be written that facilitate
- * this collaboration.</p>
- *
- * <p>This mode sustains high throughput. Depending on the data flow graph and operations,
- * this mode may increase the record latency, because operators need to align their input
- * streams, in order to create a consistent snapshot point. The latency increase for simple
- * dataflows (no repartitioning) is negligible. For simple dataflows with repartitioning, the average
- * latency remains small, but the slowest records typically have an increased latency.</p>
- */
- EXACTLY_ONCE,
-
- /**
- * Sets the checkpointing mode to "at least once". This mode means that the system will
- * checkpoint the operator and user function state in a simpler way. Upon failure and recovery,
- * some records may be reflected multiple times in the operator state.
- *
- * <p>For example, if a user function counts the number of elements in a stream,
- * this number will equal to, or larger, than the actual number of elements in the stream,
- * in the presence of failure and recovery.</p>
- *
- * <p>This mode has minimal impact on latency and may be preferable in very-low latency
- * scenarios, where a sustained very-low latency (such as few milliseconds) is needed,
- * and where occasional duplicate messages (on recovery) do not matter.</p>
- */
- AT_LEAST_ONCE
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/TimeCharacteristic.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/TimeCharacteristic.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/TimeCharacteristic.java
deleted file mode 100644
index 125ca65..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/TimeCharacteristic.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api;
-
-/**
- * The time characteristic defines how the system determines time for time-dependent
- * order and operations that depend on time (such as time windows).
- */
-public enum TimeCharacteristic {
-
- /**
- * Processing time for operators means that the operator uses the system clock of the machine
- * to determine the current time of the data stream. Processing-time windows trigger based
- * on wall-clock time and include whatever elements happen to have arrived at the operator at
- * that point in time.
- * <p>
- * Using processing time for window operations results in general in quite non-deterministic results,
- * because the contents of the windows depends on the speed in which elements arrive. It is, however,
- * the cheapest method of forming windows and the method that introduces the least latency.
- */
- ProcessingTime,
-
- /**
- * Ingestion time means that the time of each individual element in the stream is determined
- * when the element enters the Flink streaming data flow. Operations like windows group the
- * elements based on that time, meaning that processing speed within the streaming dataflow
- * does not affect windowing, but only the speed at which sources receive elements.
- * <p>
- * Ingestion time is often a good compromise between processing time and event time.
- * It does not need and special manual form of watermark generation, and events are typically
- * not too much out-or-order when they arrive at operators; in fact, out-of-orderness can
- * only be introduced by streaming shuffles or split/join/union operations. The fact that elements
- * are not very much out-of-order means that the latency increase is moderate, compared to event
- * time.
- */
- IngestionTime,
-
- /**
- * Event time means that the time of each individual element in the stream (also called event)
- * is determined by the event's individual custom timestamp. These timestamps either exist in the
- * elements from before they entered the Flink streaming dataflow, or are user-assigned at the sources.
- * The big implication of this is that elements arrive in the sources and in all operators generally
- * out of order, meaning that elements with earlier timestamps may arrive after elements with
- * later timestamps.
- * <p>
- * Operators that window or order data with respect to event time must buffer data until they can
- * be sure that all timestamps for a certain time interval have been received. This is handled by
- * the so called "time watermarks".
- * <p>
- * Operations based on event time are very predictable - the result of windowing operations
- * is typically identical no matter when the window is executed and how fast the streams operate.
- * At the same time, the buffering and tracking of event time is also costlier than operating
- * with processing time, and typically also introduces more latency. The amount of extra
- * cost depends mostly on how much out of order the elements arrive, i.e., how long the time span
- * between the arrival of early and late elements is. With respect to the "time watermarks", this
- * means that the cost typically depends on how early or late the watermarks can be generated
- * for their timestamp.
- * <p>
- * In relation to {@link #IngestionTime}, the event time is similar, but refers the the event's
- * original time, rather than the time assigned at the data source. Practically, that means that
- * event time has generally more meaning, but also that it takes longer to determine that all
- * elements for a certain time have arrived.
- */
- EventTime
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointNotifier.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointNotifier.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointNotifier.java
deleted file mode 100644
index c2d2182..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointNotifier.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.checkpoint;
-
-/**
- * This interface must be implemented by functions/operations that want to receive
- * a commit notification once a checkpoint has been completely acknowledged by all
- * participants.
- */
-public interface CheckpointNotifier {
-
- /**
- * This method is called as a notification once a distributed checkpoint has been completed.
- *
- * Note that any exception during this method will not cause the checkpoint to
- * fail any more.
- *
- * @param checkpointId The ID of the checkpoint that has been completed.
- * @throws Exception
- */
- void notifyCheckpointComplete(long checkpointId) throws Exception;
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
deleted file mode 100644
index ac1cbfb..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.checkpoint;
-
-import java.io.Serializable;
-
-/**
- * This method must be implemented by functions that have state that needs to be
- * checkpointed. The functions get a call whenever a checkpoint should take place
- * and return a snapshot of their state, which will be checkpointed.
- *
- * <p>This interface marks a function as <i>synchronously</i> checkpointed. While the
- * state is written, the function is not called, so the function needs not return a
- * copy of its state, but may return a reference to its state. Functions that can
- * continue to work and mutate the state, even while the state snapshot is being accessed,
- * can implement the {@link org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously}
- * interface.</p>
- *
- * @param <T> The type of the operator state.
- */
-public interface Checkpointed<T extends Serializable> {
-
- /**
- * Gets the current state of the function of operator. The state must reflect the result of all
- * prior invocations to this function.
- *
- * @param checkpointId The ID of the checkpoint.
- * @param checkpointTimestamp The timestamp of the checkpoint, as derived by
- * System.currentTimeMillis() on the JobManager.
- *
- * @return A snapshot of the operator state.
- *
- * @throws Exception Thrown if the creation of the state object failed. This causes the
- * checkpoint to fail. The system may decide to fail the operation (and trigger
- * recovery), or to discard this checkpoint attempt and to continue running
- * and to try again with the next checkpoint attempt.
- */
- T snapshotState(long checkpointId, long checkpointTimestamp) throws Exception;
-
- /**
- * Restores the state of the function or operator to that of a previous checkpoint.
- * This method is invoked when a function is executed as part of a recovery run.
- *
- * Note that restoreState() is called before open().
- *
- * @param state The state to be restored.
- */
- void restoreState(T state) throws Exception;
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java
deleted file mode 100644
index 4bd89c4..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.checkpoint;
-
-import java.io.Serializable;
-
-/**
- * This interface marks a function/operator as <i>asynchronously checkpointed</i>.
- * Similar to the {@link Checkpointed} interface, the function must produce a
- * snapshot of its state. However, the function must be able to continue working
- * and mutating its state without mutating the returned state snapshot.
- *
- * <p>Asynchronous checkpoints are desirable, because they allow the data streams at the
- * point of the checkpointed function/operator to continue running while the checkpoint
- * is in progress.</p>
- *
- * <p>To be able to support asynchronous snapshots, the state returned by the
- * {@link #snapshotState(long, long)} method is typically a copy or shadow copy
- * of the actual state.</p>
- */
-public interface CheckpointedAsynchronously<T extends Serializable> extends Checkpointed<T> {}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java
deleted file mode 100644
index 7034b11..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.collector.selector;
-
-import java.util.ArrayList;
-
-import org.apache.flink.streaming.api.graph.StreamEdge;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.Collector;
-
-public class BroadcastOutputSelectorWrapper<OUT> implements OutputSelectorWrapper<OUT> {
-
- private static final long serialVersionUID = 1L;
-
- private final ArrayList<Collector<StreamRecord<OUT>>> outputs;
-
- public BroadcastOutputSelectorWrapper() {
- outputs = new ArrayList<Collector<StreamRecord<OUT>>>();
- }
-
- @Override
- public void addCollector(Collector<StreamRecord<OUT>> output, StreamEdge edge) {
- outputs.add(output);
- }
-
- @Override
- public Iterable<Collector<StreamRecord<OUT>>> getSelectedOutputs(OUT record) {
- return outputs;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputSelectorWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputSelectorWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputSelectorWrapper.java
deleted file mode 100644
index 84558fc..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputSelectorWrapper.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.collector.selector;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.flink.streaming.api.graph.StreamEdge;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.Collector;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class DirectedOutputSelectorWrapper<OUT> implements OutputSelectorWrapper<OUT> {
-
- private static final long serialVersionUID = 1L;
-
- private static final Logger LOG = LoggerFactory.getLogger(DirectedOutputSelectorWrapper.class);
-
- private List<OutputSelector<OUT>> outputSelectors;
-
- private HashMap<String, ArrayList<Collector<StreamRecord<OUT>>>> outputMap;
- private HashSet<Collector<StreamRecord<OUT>>> selectAllOutputs;
-
- public DirectedOutputSelectorWrapper(List<OutputSelector<OUT>> outputSelectors) {
- this.outputSelectors = outputSelectors;
- this.selectAllOutputs = new HashSet<Collector<StreamRecord<OUT>>>();
- this.outputMap = new HashMap<String, ArrayList<Collector<StreamRecord<OUT>>>>();
- }
-
- @Override
- public void addCollector(Collector<StreamRecord<OUT>> output, StreamEdge edge) {
- List<String> selectedNames = edge.getSelectedNames();
-
- if (selectedNames.isEmpty()) {
- selectAllOutputs.add(output);
- }
- else {
- for (String selectedName : selectedNames) {
- if (!outputMap.containsKey(selectedName)) {
- outputMap.put(selectedName, new ArrayList<Collector<StreamRecord<OUT>>>());
- outputMap.get(selectedName).add(output);
- }
- else {
- if (!outputMap.get(selectedName).contains(output)) {
- outputMap.get(selectedName).add(output);
- }
- }
- }
- }
- }
-
- @Override
- public Iterable<Collector<StreamRecord<OUT>>> getSelectedOutputs(OUT record) {
- Set<Collector<StreamRecord<OUT>>> selectedOutputs = new HashSet<Collector<StreamRecord<OUT>>>(selectAllOutputs);
-
- for (OutputSelector<OUT> outputSelector : outputSelectors) {
- Iterable<String> outputNames = outputSelector.select(record);
-
- for (String outputName : outputNames) {
- List<Collector<StreamRecord<OUT>>> outputList = outputMap.get(outputName);
-
- try {
- selectedOutputs.addAll(outputList);
- } catch (NullPointerException e) {
- if (LOG.isErrorEnabled()) {
- String format = String.format(
- "Cannot emit because no output is selected with the name: %s",
- outputName);
- LOG.error(format);
- }
- }
- }
- }
-
- return selectedOutputs;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelector.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelector.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelector.java
deleted file mode 100644
index 9c6eede..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelector.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.collector.selector;
-
-import java.io.Serializable;
-
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.datastream.SplitStream;
-
-/**
- * Interface for defining an OutputSelector for a {@link SplitStream} using
- * the {@link SingleOutputStreamOperator#split} call. Every output object of a
- * {@link SplitStream} will run through this operator to select outputs.
- *
- * @param <OUT>
- * Type parameter of the split values.
- */
-public interface OutputSelector<OUT> extends Serializable {
- /**
- * Method for selecting output names for the emitted objects when using the
- * {@link SingleOutputStreamOperator#split} method. The values will be
- * emitted only to output names which are contained in the returned
- * iterable.
- *
- * @param value
- * Output object for which the output selection should be made.
- */
- public Iterable<String> select(OUT value);
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapper.java
deleted file mode 100644
index f25c995..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapper.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.collector.selector;
-
-import java.io.Serializable;
-
-import org.apache.flink.streaming.api.graph.StreamEdge;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.Collector;
-
-public interface OutputSelectorWrapper<OUT> extends Serializable {
-
- public void addCollector(Collector<StreamRecord<OUT>> output, StreamEdge edge);
-
- public Iterable<Collector<StreamRecord<OUT>>> getSelectedOutputs(OUT record);
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapperFactory.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapperFactory.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapperFactory.java
deleted file mode 100644
index dca2ede..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapperFactory.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.collector.selector;
-
-import java.util.List;
-
-public class OutputSelectorWrapperFactory {
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public static OutputSelectorWrapper<?> create(List<OutputSelector<?>> outputSelectors) {
- if (outputSelectors.size() == 0) {
- return new BroadcastOutputSelectorWrapper();
- } else {
- return new DirectedOutputSelectorWrapper(outputSelectors);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
deleted file mode 100644
index 7191304..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ /dev/null
@@ -1,556 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import org.apache.commons.lang.SerializationUtils;
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.Utils;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
-import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
-import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
-import org.apache.flink.streaming.api.functions.windowing.FoldAllWindowFunction;
-import org.apache.flink.streaming.api.functions.windowing.ReduceAllWindowFunction;
-import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
-import org.apache.flink.streaming.api.windowing.evictors.Evictor;
-import org.apache.flink.streaming.api.windowing.triggers.Trigger;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.operators.windowing.EvictingNonKeyedWindowOperator;
-import org.apache.flink.streaming.runtime.operators.windowing.NonKeyedWindowOperator;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
-
-/**
- * A {@code AllWindowedStream} represents a data stream where the stream of
- * elements is split into windows based on a
- * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. Window emission
- * is triggered based on a {@link org.apache.flink.streaming.api.windowing.triggers.Trigger}.
- *
- * <p>
- * If an {@link org.apache.flink.streaming.api.windowing.evictors.Evictor} is specified it will be
- * used to evict elements from the window after
- * evaluation was triggered by the {@code Trigger} but before the actual evaluation of the window.
- * When using an evictor window performance will degrade significantly, since
- * pre-aggregation of window results cannot be used.
- *
- * <p>
- * Note that the {@code AllWindowedStream} is purely and API construct, during runtime
- * the {@code AllWindowedStream} will be collapsed together with the
- * operation over the window into one single operation.
- *
- * @param <T> The type of elements in the stream.
- * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns the elements to.
- */
-public class AllWindowedStream<T, W extends Window> {
-
- /** The data stream that is windowed by this stream */
- private final DataStream<T> input;
-
- /** The window assigner */
- private final WindowAssigner<? super T, W> windowAssigner;
-
- /** The trigger that is used for window evaluation/emission. */
- private Trigger<? super T, ? super W> trigger;
-
- /** The evictor that is used for evicting elements before window evaluation. */
- private Evictor<? super T, ? super W> evictor;
-
-
- public AllWindowedStream(DataStream<T> input,
- WindowAssigner<? super T, W> windowAssigner) {
- this.input = input;
- this.windowAssigner = windowAssigner;
- this.trigger = windowAssigner.getDefaultTrigger(input.getExecutionEnvironment());
- }
-
- /**
- * Sets the {@code Trigger} that should be used to trigger window emission.
- */
- public AllWindowedStream<T, W> trigger(Trigger<? super T, ? super W> trigger) {
- this.trigger = trigger;
- return this;
- }
-
- /**
- * Sets the {@code Evictor} that should be used to evict elements from a window before emission.
- *
- * <p>
- * Note: When using an evictor window performance will degrade significantly, since
- * pre-aggregation of window results cannot be used.
- */
- public AllWindowedStream<T, W> evictor(Evictor<? super T, ? super W> evictor) {
- this.evictor = evictor;
- return this;
- }
-
-
- // ------------------------------------------------------------------------
- // Operations on the keyed windows
- // ------------------------------------------------------------------------
-
- /**
- * Applies a reduce function to the window. The window function is called for each evaluation
- * of the window for each key individually. The output of the reduce function is interpreted
- * as a regular non-windowed stream.
- * <p>
- * This window will try and pre-aggregate data as much as the window policies permit. For example,
- * tumbling time windows can perfectly pre-aggregate the data, meaning that only one element per
- * key is stored. Sliding time windows will pre-aggregate on the granularity of the slide interval,
- * so a few elements are stored per key (one per slide interval).
- * Custom windows may not be able to pre-aggregate, or may need to store extra values in an
- * aggregation tree.
- *
- * @param function The reduce function.
- * @return The data stream that is the result of applying the reduce function to the window.
- */
- public SingleOutputStreamOperator<T, ?> reduce(ReduceFunction<T> function) {
- //clean the closure
- function = input.getExecutionEnvironment().clean(function);
-
- String callLocation = Utils.getCallLocationName();
- String udfName = "Reduce at " + callLocation;
-
- SingleOutputStreamOperator<T, ?> result = createFastTimeOperatorIfValid(function, input.getType(), udfName);
- if (result != null) {
- return result;
- }
-
- String opName = "NonParallelTriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
-
- OneInputStreamOperator<T, T> operator;
-
- boolean setProcessingTime = input.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
-
- if (evictor != null) {
- operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
- windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
- new HeapWindowBuffer.Factory<T>(),
- new ReduceAllWindowFunction<W, T>(function),
- trigger,
- evictor).enableSetProcessingTime(setProcessingTime);
-
- } else {
- // we need to copy because we need our own instance of the pre aggregator
- @SuppressWarnings("unchecked")
- ReduceFunction<T> functionCopy = (ReduceFunction<T>) SerializationUtils.clone(function);
-
- operator = new NonKeyedWindowOperator<>(windowAssigner,
- windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
- new PreAggregatingHeapWindowBuffer.Factory<>(functionCopy),
- new ReduceAllWindowFunction<W, T>(function),
- trigger).enableSetProcessingTime(setProcessingTime);
- }
-
- return input.transform(opName, input.getType(), operator).setParallelism(1);
- }
-
- /**
- * Applies the given fold function to each window. The window function is called for each
- * evaluation of the window for each key individually. The output of the reduce function is
- * interpreted as a regular non-windowed stream.
- *
- * @param function The fold function.
- * @return The data stream that is the result of applying the fold function to the window.
- */
- public <R> SingleOutputStreamOperator<R, ?> fold(R initialValue, FoldFunction<T, R> function) {
- //clean the closure
- function = input.getExecutionEnvironment().clean(function);
-
- TypeInformation<R> resultType = TypeExtractor.getFoldReturnTypes(function, input.getType(),
- Utils.getCallLocationName(), true);
-
- return apply(new FoldAllWindowFunction<W, T, R>(initialValue, function), resultType);
- }
-
- /**
- * Applies the given fold function to each window. The window function is called for each
- * evaluation of the window for each key individually. The output of the reduce function is
- * interpreted as a regular non-windowed stream.
- *
- * @param function The fold function.
- * @return The data stream that is the result of applying the fold function to the window.
- */
- public <R> SingleOutputStreamOperator<R, ?> fold(R initialValue, FoldFunction<T, R> function, TypeInformation<R> resultType) {
- //clean the closure
- function = input.getExecutionEnvironment().clean(function);
- return apply(new FoldAllWindowFunction<W, T, R>(initialValue, function), resultType);
- }
-
- /**
- * Applies a window function to the window. The window function is called for each evaluation
- * of the window for each key individually. The output of the window function is interpreted
- * as a regular non-windowed stream.
- * <p>
- * Not that this function requires that all data in the windows is buffered until the window
- * is evaluated, as the function provides no means of pre-aggregation.
- *
- * @param function The window function.
- * @return The data stream that is the result of applying the window function to the window.
- */
- public <R> SingleOutputStreamOperator<R, ?> apply(AllWindowFunction<T, R, W> function) {
- TypeInformation<T> inType = input.getType();
- TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
- function, AllWindowFunction.class, true, true, inType, null, false);
-
- return apply(function, resultType);
- }
-
- /**
- * Applies the given window function to each window. The window function is called for each evaluation
- * of the window for each key individually. The output of the window function is interpreted
- * as a regular non-windowed stream.
- * <p>
- * Not that this function requires that all data in the windows is buffered until the window
- * is evaluated, as the function provides no means of pre-aggregation.
- *
- * @param function The window function.
- * @return The data stream that is the result of applying the window function to the window.
- */
- public <R> SingleOutputStreamOperator<R, ?> apply(AllWindowFunction<T, R, W> function, TypeInformation<R> resultType) {
- //clean the closure
- function = input.getExecutionEnvironment().clean(function);
-
- String callLocation = Utils.getCallLocationName();
- String udfName = "WindowApply at " + callLocation;
-
- SingleOutputStreamOperator<R, ?> result = createFastTimeOperatorIfValid(function, resultType, udfName);
- if (result != null) {
- return result;
- }
-
-
- String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
-
- NonKeyedWindowOperator<T, R, W> operator;
-
- boolean setProcessingTime = input.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
-
- if (evictor != null) {
- operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
- windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
- new HeapWindowBuffer.Factory<T>(),
- function,
- trigger,
- evictor).enableSetProcessingTime(setProcessingTime);
-
- } else {
- operator = new NonKeyedWindowOperator<>(windowAssigner,
- windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
- new HeapWindowBuffer.Factory<T>(),
- function,
- trigger).enableSetProcessingTime(setProcessingTime);
- }
-
- return input.transform(opName, resultType, operator).setParallelism(1);
- }
-
- /**
- * Applies the given window function to each window. The window function is called for each
- * evaluation of the window for each key individually. The output of the window function is
- * interpreted as a regular non-windowed stream.
- *
- * <p>
- * Arriving data is pre-aggregated using the given pre-aggregation reducer.
- *
- * @param preAggregator The reduce function that is used for pre-aggregation
- * @param function The window function.
- * @return The data stream that is the result of applying the window function to the window.
- */
-
- public <R> SingleOutputStreamOperator<R, ?> apply(ReduceFunction<T> preAggregator, AllWindowFunction<T, R, W> function) {
- TypeInformation<T> inType = input.getType();
- TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
- function, AllWindowFunction.class, true, true, inType, null, false);
-
- return apply(preAggregator, function, resultType);
- }
-
- /**
- * Applies the given window function to each window. The window function is called for each
- * evaluation of the window for each key individually. The output of the window function is
- * interpreted as a regular non-windowed stream.
- *
- * <p>
- * Arriving data is pre-aggregated using the given pre-aggregation reducer.
- *
- * @param preAggregator The reduce function that is used for pre-aggregation
- * @param function The window function.
- * @param resultType Type information for the result type of the window function
- * @return The data stream that is the result of applying the window function to the window.
- */
- public <R> SingleOutputStreamOperator<R, ?> apply(ReduceFunction<T> preAggregator, AllWindowFunction<T, R, W> function, TypeInformation<R> resultType) {
- //clean the closures
- function = input.getExecutionEnvironment().clean(function);
- preAggregator = input.getExecutionEnvironment().clean(preAggregator);
-
- String callLocation = Utils.getCallLocationName();
- String udfName = "WindowApply at " + callLocation;
-
- String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
-
- OneInputStreamOperator<T, R> operator;
-
- boolean setProcessingTime = input.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
-
- if (evictor != null) {
- operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
- windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
- new HeapWindowBuffer.Factory<T>(),
- function,
- trigger,
- evictor).enableSetProcessingTime(setProcessingTime);
-
- } else {
- operator = new NonKeyedWindowOperator<>(windowAssigner,
- windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
- new PreAggregatingHeapWindowBuffer.Factory<>(preAggregator),
- function,
- trigger).enableSetProcessingTime(setProcessingTime);
- }
-
- return input.transform(opName, resultType, operator).setParallelism(1);
- }
-
- // ------------------------------------------------------------------------
- // Aggregations on the windows
- // ------------------------------------------------------------------------
-
- /**
- * Applies an aggregation that sums every window of the data stream at the
- * given position.
- *
- * @param positionToSum The position in the tuple/array to sum
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T, ?> sum(int positionToSum) {
- return aggregate(new SumAggregator<>(positionToSum, input.getType(), input.getExecutionConfig()));
- }
-
- /**
- * Applies an aggregation that sums every window of the pojo data stream at
- * the given field for every window.
- *
- * <p>
- * A field expression is either
- * the name of a public field or a getter method with parentheses of the
- * stream's underlying type. A dot can be used to drill down into objects,
- * as in {@code "field1.getInnerField2()" }.
- *
- * @param field The field to sum
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T, ?> sum(String field) {
- return aggregate(new SumAggregator<>(field, input.getType(), input.getExecutionConfig()));
- }
-
- /**
- * Applies an aggregation that that gives the minimum value of every window
- * of the data stream at the given position.
- *
- * @param positionToMin The position to minimize
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T, ?> min(int positionToMin) {
- return aggregate(new ComparableAggregator<>(positionToMin, input.getType(), AggregationFunction.AggregationType.MIN, input.getExecutionConfig()));
- }
-
- /**
- * Applies an aggregation that that gives the minimum value of the pojo data
- * stream at the given field expression for every window.
- *
- * <p>
- * A field
- * expression is either the name of a public field or a getter method with
- * parentheses of the {@link DataStream}S underlying type. A dot can be used
- * to drill down into objects, as in {@code "field1.getInnerField2()" }.
- *
- * @param field The field expression based on which the aggregation will be applied.
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T, ?> min(String field) {
- return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MIN, false, input.getExecutionConfig()));
- }
-
- /**
- * Applies an aggregation that gives the minimum element of every window of
- * the data stream by the given position. If more elements have the same
- * minimum value the operator returns the first element by default.
- *
- * @param positionToMinBy
- * The position to minimize by
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T, ?> minBy(int positionToMinBy) {
- return this.minBy(positionToMinBy, true);
- }
-
- /**
- * Applies an aggregation that gives the minimum element of every window of
- * the data stream by the given position. If more elements have the same
- * minimum value the operator returns the first element by default.
- *
- * @param positionToMinBy The position to minimize by
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T, ?> minBy(String positionToMinBy) {
- return this.minBy(positionToMinBy, true);
- }
-
- /**
- * Applies an aggregation that gives the minimum element of every window of
- * the data stream by the given position. If more elements have the same
- * minimum value the operator returns either the first or last one depending
- * on the parameter setting.
- *
- * @param positionToMinBy The position to minimize
- * @param first If true, then the operator return the first element with the minimum value, otherwise returns the last
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T, ?> minBy(int positionToMinBy, boolean first) {
- return aggregate(new ComparableAggregator<>(positionToMinBy, input.getType(), AggregationFunction.AggregationType.MINBY, first, input.getExecutionConfig()));
- }
-
- /**
- * Applies an aggregation that that gives the minimum element of the pojo
- * data stream by the given field expression for every window. A field
- * expression is either the name of a public field or a getter method with
- * parentheses of the {@link DataStream DataStreams} underlying type. A dot can be used
- * to drill down into objects, as in {@code "field1.getInnerField2()" }.
- *
- * @param field The field expression based on which the aggregation will be applied.
- * @param first If True then in case of field equality the first object will be returned
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T, ?> minBy(String field, boolean first) {
- return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MINBY, first, input.getExecutionConfig()));
- }
-
- /**
- * Applies an aggregation that gives the maximum value of every window of
- * the data stream at the given position.
- *
- * @param positionToMax The position to maximize
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T, ?> max(int positionToMax) {
- return aggregate(new ComparableAggregator<>(positionToMax, input.getType(), AggregationFunction.AggregationType.MAX, input.getExecutionConfig()));
- }
-
- /**
- * Applies an aggregation that that gives the maximum value of the pojo data
- * stream at the given field expression for every window. A field expression
- * is either the name of a public field or a getter method with parentheses
- * of the {@link DataStream DataStreams} underlying type. A dot can be used to drill
- * down into objects, as in {@code "field1.getInnerField2()" }.
- *
- * @param field The field expression based on which the aggregation will be applied.
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T, ?> max(String field) {
- return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MAX, false, input.getExecutionConfig()));
- }
-
- /**
- * Applies an aggregation that gives the maximum element of every window of
- * the data stream by the given position. If more elements have the same
- * maximum value the operator returns the first by default.
- *
- * @param positionToMaxBy
- * The position to maximize by
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T, ?> maxBy(int positionToMaxBy) {
- return this.maxBy(positionToMaxBy, true);
- }
-
- /**
- * Applies an aggregation that gives the maximum element of every window of
- * the data stream by the given position. If more elements have the same
- * maximum value the operator returns the first by default.
- *
- * @param positionToMaxBy
- * The position to maximize by
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T, ?> maxBy(String positionToMaxBy) {
- return this.maxBy(positionToMaxBy, true);
- }
-
- /**
- * Applies an aggregation that gives the maximum element of every window of
- * the data stream by the given position. If more elements have the same
- * maximum value the operator returns either the first or last one depending
- * on the parameter setting.
- *
- * @param positionToMaxBy The position to maximize by
- * @param first If true, then the operator return the first element with the maximum value, otherwise returns the last
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T, ?> maxBy(int positionToMaxBy, boolean first) {
- return aggregate(new ComparableAggregator<>(positionToMaxBy, input.getType(), AggregationFunction.AggregationType.MAXBY, first, input.getExecutionConfig()));
- }
-
- /**
- * Applies an aggregation that that gives the maximum element of the pojo
- * data stream by the given field expression for every window. A field
- * expression is either the name of a public field or a getter method with
- * parentheses of the {@link DataStream}S underlying type. A dot can be used
- * to drill down into objects, as in {@code "field1.getInnerField2()" }.
- *
- * @param field The field expression based on which the aggregation will be applied.
- * @param first If True then in case of field equality the first object will be returned
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T, ?> maxBy(String field, boolean first) {
- return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MAXBY, first, input.getExecutionConfig()));
- }
-
- private SingleOutputStreamOperator<T, ?> aggregate(AggregationFunction<T> aggregator) {
- return reduce(aggregator);
- }
-
- // ------------------------------------------------------------------------
- // Utilities
- // ------------------------------------------------------------------------
-
-
- private <R> SingleOutputStreamOperator<R, ?> createFastTimeOperatorIfValid(
- Function function,
- TypeInformation<R> resultType,
- String functionName) {
-
- // TODO: add once non-parallel fast aligned time windows operator is ready
- return null;
- }
-
- public StreamExecutionEnvironment getExecutionEnvironment() {
- return input.getExecutionEnvironment();
- }
-
- public TypeInformation<T> getInputType() {
- return input.getType();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
deleted file mode 100644
index d1da783..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
+++ /dev/null
@@ -1,575 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.CoGroupFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.operators.translation.WrappingFunction;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
-import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
-import org.apache.flink.streaming.api.windowing.evictors.Evictor;
-import org.apache.flink.streaming.api.windowing.triggers.Trigger;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.util.Collector;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- *{@code CoGroupedStreams} represents two {@link DataStream DataStreams} that have been co-grouped.
- * A streaming co-group operation is evaluated over elements in a window.
- *
- * <p>
- * To finalize co-group operation you also need to specify a {@link KeySelector} for
- * both the first and second input and a {@link WindowAssigner}.
- *
- * <p>
- * Note: Right now, the groups are being built in memory so you need to ensure that they don't
- * get too big. Otherwise the JVM might crash.
- *
- * <p>
- * Example:
- *
- * <pre> {@code
- * DataStream<Tuple2<String, Integer>> one = ...;
- * DataStream<Tuple2<String, Integer>> two = ...;
- *
- * DataStream<T> result = one.coGroup(two)
- * .where(new MyFirstKeySelector())
- * .equalTo(new MyFirstKeySelector())
- * .window(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
- * .apply(new MyCoGroupFunction());
- * } </pre>
- */
-public class CoGroupedStreams<T1, T2> {
-
- /** The first input stream */
- private final DataStream<T1> input1;
-
- /** The second input stream */
- private final DataStream<T2> input2;
-
- /**
- * Creates new CoGroped data streams, which are the first step towards building a streaming co-group.
- *
- * @param input1 The first data stream.
- * @param input2 The second data stream.
- */
- public CoGroupedStreams(DataStream<T1> input1, DataStream<T2> input2) {
- this.input1 = requireNonNull(input1);
- this.input2 = requireNonNull(input2);
- }
-
- /**
- * Specifies a {@link KeySelector} for elements from the first input.
- */
- public <KEY> Where<KEY> where(KeySelector<T1, KEY> keySelector) {
- TypeInformation<KEY> keyType = TypeExtractor.getKeySelectorTypes(keySelector, input1.getType());
- return new Where<>(input1.clean(keySelector), keyType);
- }
-
- // ------------------------------------------------------------------------
-
- /**
- * CoGrouped streams that have the key for one side defined.
- *
- * @param <KEY> The type of the key.
- */
- public class Where<KEY> {
-
- private final KeySelector<T1, KEY> keySelector1;
- private final TypeInformation<KEY> keyType;
-
- Where(KeySelector<T1, KEY> keySelector1, TypeInformation<KEY> keyType) {
- this.keySelector1 = keySelector1;
- this.keyType = keyType;
- }
-
- /**
- * Specifies a {@link KeySelector} for elements from the second input.
- */
- public EqualTo equalTo(KeySelector<T2, KEY> keySelector) {
- TypeInformation<KEY> otherKey = TypeExtractor.getKeySelectorTypes(keySelector, input2.getType());
- if (!otherKey.equals(this.keyType)) {
- throw new IllegalArgumentException("The keys for the two inputs are not equal: " +
- "first key = " + this.keyType + " , second key = " + otherKey);
- }
-
- return new EqualTo(input2.clean(keySelector));
- }
-
- // --------------------------------------------------------------------
-
- /**
- * A co-group operation that has {@link KeySelector KeySelectors} defined for both inputs.
- */
- public class EqualTo {
-
- private final KeySelector<T2, KEY> keySelector2;
-
- EqualTo(KeySelector<T2, KEY> keySelector2) {
- this.keySelector2 = requireNonNull(keySelector2);
- }
-
- /**
- * Specifies the window on which the co-group operation works.
- */
- public <W extends Window> WithWindow<T1, T2, KEY, W> window(WindowAssigner<? super TaggedUnion<T1, T2>, W> assigner) {
- return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, assigner, null, null);
- }
- }
- }
-
- // ------------------------------------------------------------------------
-
- /**
- * A co-group operation that has {@link KeySelector KeySelectors} defined for both inputs as
- * well as a {@link WindowAssigner}.
- *
- * @param <T1> Type of the elements from the first input
- * @param <T2> Type of the elements from the second input
- * @param <KEY> Type of the key. This must be the same for both inputs
- * @param <W> Type of {@link Window} on which the co-group operation works.
- */
- public static class WithWindow<T1, T2, KEY, W extends Window> {
- private final DataStream<T1> input1;
- private final DataStream<T2> input2;
-
- private final KeySelector<T1, KEY> keySelector1;
- private final KeySelector<T2, KEY> keySelector2;
-
- private final TypeInformation<KEY> keyType;
-
- private final WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner;
-
- private final Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger;
-
- private final Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor;
-
- protected WithWindow(DataStream<T1> input1,
- DataStream<T2> input2,
- KeySelector<T1, KEY> keySelector1,
- KeySelector<T2, KEY> keySelector2,
- TypeInformation<KEY> keyType,
- WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner,
- Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger,
- Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor) {
- this.input1 = input1;
- this.input2 = input2;
-
- this.keySelector1 = keySelector1;
- this.keySelector2 = keySelector2;
- this.keyType = keyType;
-
- this.windowAssigner = windowAssigner;
- this.trigger = trigger;
- this.evictor = evictor;
- }
-
- /**
- * Sets the {@code Trigger} that should be used to trigger window emission.
- */
- public WithWindow<T1, T2, KEY, W> trigger(Trigger<? super TaggedUnion<T1, T2>, ? super W> newTrigger) {
- return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
- windowAssigner, newTrigger, evictor);
- }
-
- /**
- * Sets the {@code Evictor} that should be used to evict elements from a window before emission.
- *
- * <p>
- * Note: When using an evictor window performance will degrade significantly, since
- * pre-aggregation of window results cannot be used.
- */
- public WithWindow<T1, T2, KEY, W> evictor(Evictor<? super TaggedUnion<T1, T2>, ? super W> newEvictor) {
- return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
- windowAssigner, trigger, newEvictor);
- }
-
- /**
- * Completes the co-group operation with the user function that is executed
- * for windowed groups.
- */
- public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function) {
-
- TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType(
- function,
- CoGroupFunction.class,
- true,
- true,
- input1.getType(),
- input2.getType(),
- "CoGroup",
- false);
-
- return apply(function, resultType);
- }
-
- /**
- * Completes the co-group operation with the user function that is executed
- * for windowed groups.
- */
- public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) {
- //clean the closure
- function = input1.getExecutionEnvironment().clean(function);
-
- UnionTypeInfo<T1, T2> unionType = new UnionTypeInfo<>(input1.getType(), input2.getType());
- UnionKeySelector<T1, T2, KEY> unionKeySelector = new UnionKeySelector<>(keySelector1, keySelector2);
-
- DataStream<TaggedUnion<T1, T2>> taggedInput1 = input1
- .map(new Input1Tagger<T1, T2>())
- .returns(unionType);
- DataStream<TaggedUnion<T1, T2>> taggedInput2 = input2
- .map(new Input2Tagger<T1, T2>())
- .returns(unionType);
-
- DataStream<TaggedUnion<T1, T2>> unionStream = taggedInput1.union(taggedInput2);
-
- // we explicitly create the keyed stream to manually pass the key type information in
- WindowedStream<TaggedUnion<T1, T2>, KEY, W> windowOp =
- new KeyedStream<TaggedUnion<T1, T2>, KEY>(unionStream, unionKeySelector, keyType)
- .window(windowAssigner);
-
- if (trigger != null) {
- windowOp.trigger(trigger);
- }
- if (evictor != null) {
- windowOp.evictor(evictor);
- }
-
- return windowOp.apply(new CoGroupWindowFunction<T1, T2, T, KEY, W>(function), resultType);
- }
- }
-
- // ------------------------------------------------------------------------
- // Data type and type information for Tagged Union
- // ------------------------------------------------------------------------
-
- /**
- * Internal class for implementing tagged union co-group.
- */
- public static class TaggedUnion<T1, T2> {
- private final T1 one;
- private final T2 two;
-
- private TaggedUnion(T1 one, T2 two) {
- this.one = one;
- this.two = two;
- }
-
- public boolean isOne() {
- return one != null;
- }
-
- public boolean isTwo() {
- return two != null;
- }
-
- public T1 getOne() {
- return one;
- }
-
- public T2 getTwo() {
- return two;
- }
-
- public static <T1, T2> TaggedUnion<T1, T2> one(T1 one) {
- return new TaggedUnion<>(one, null);
- }
-
- public static <T1, T2> TaggedUnion<T1, T2> two(T2 two) {
- return new TaggedUnion<>(null, two);
- }
- }
-
- private static class UnionTypeInfo<T1, T2> extends TypeInformation<TaggedUnion<T1, T2>> {
- private static final long serialVersionUID = 1L;
-
- TypeInformation<T1> oneType;
- TypeInformation<T2> twoType;
-
- public UnionTypeInfo(TypeInformation<T1> oneType,
- TypeInformation<T2> twoType) {
- this.oneType = oneType;
- this.twoType = twoType;
- }
-
- @Override
- public boolean isBasicType() {
- return false;
- }
-
- @Override
- public boolean isTupleType() {
- return false;
- }
-
- @Override
- public int getArity() {
- return 2;
- }
-
- @Override
- public int getTotalFields() {
- return 2;
- }
-
- @Override
- @SuppressWarnings("unchecked, rawtypes")
- public Class<TaggedUnion<T1, T2>> getTypeClass() {
- return (Class) TaggedUnion.class;
- }
-
- @Override
- public boolean isKeyType() {
- return true;
- }
-
- @Override
- public TypeSerializer<TaggedUnion<T1, T2>> createSerializer(ExecutionConfig config) {
- return new UnionSerializer<>(oneType.createSerializer(config), twoType.createSerializer(config));
- }
-
- @Override
- public String toString() {
- return "TaggedUnion<" + oneType + ", " + twoType + ">";
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj instanceof UnionTypeInfo) {
- @SuppressWarnings("unchecked")
- UnionTypeInfo<T1, T2> unionTypeInfo = (UnionTypeInfo<T1, T2>) obj;
-
- return unionTypeInfo.canEqual(this) && oneType.equals(unionTypeInfo.oneType) && twoType.equals(unionTypeInfo.twoType);
- } else {
- return false;
- }
- }
-
- @Override
- public int hashCode() {
- return 31 * oneType.hashCode() + twoType.hashCode();
- }
-
- @Override
- public boolean canEqual(Object obj) {
- return obj instanceof UnionTypeInfo;
- }
- }
-
- private static class UnionSerializer<T1, T2> extends TypeSerializer<TaggedUnion<T1, T2>> {
- private static final long serialVersionUID = 1L;
-
- private final TypeSerializer<T1> oneSerializer;
- private final TypeSerializer<T2> twoSerializer;
-
- public UnionSerializer(TypeSerializer<T1> oneSerializer,
- TypeSerializer<T2> twoSerializer) {
- this.oneSerializer = oneSerializer;
- this.twoSerializer = twoSerializer;
- }
-
- @Override
- public boolean isImmutableType() {
- return false;
- }
-
- @Override
- public TypeSerializer<TaggedUnion<T1, T2>> duplicate() {
- return this;
- }
-
- @Override
- public TaggedUnion<T1, T2> createInstance() {
- return null;
- }
-
- @Override
- public TaggedUnion<T1, T2> copy(TaggedUnion<T1, T2> from) {
- if (from.isOne()) {
- return TaggedUnion.one(oneSerializer.copy(from.getOne()));
- } else {
- return TaggedUnion.two(twoSerializer.copy(from.getTwo()));
- }
- }
-
- @Override
- public TaggedUnion<T1, T2> copy(TaggedUnion<T1, T2> from, TaggedUnion<T1, T2> reuse) {
- if (from.isOne()) {
- return TaggedUnion.one(oneSerializer.copy(from.getOne()));
- } else {
- return TaggedUnion.two(twoSerializer.copy(from.getTwo()));
- } }
-
- @Override
- public int getLength() {
- return -1;
- }
-
- @Override
- public void serialize(TaggedUnion<T1, T2> record, DataOutputView target) throws IOException {
- if (record.isOne()) {
- target.writeByte(1);
- oneSerializer.serialize(record.getOne(), target);
- } else {
- target.writeByte(2);
- twoSerializer.serialize(record.getTwo(), target);
- }
- }
-
- @Override
- public TaggedUnion<T1, T2> deserialize(DataInputView source) throws IOException {
- byte tag = source.readByte();
- if (tag == 1) {
- return TaggedUnion.one(oneSerializer.deserialize(source));
- } else {
- return TaggedUnion.two(twoSerializer.deserialize(source));
- }
- }
-
- @Override
- public TaggedUnion<T1, T2> deserialize(TaggedUnion<T1, T2> reuse,
- DataInputView source) throws IOException {
- byte tag = source.readByte();
- if (tag == 1) {
- return TaggedUnion.one(oneSerializer.deserialize(source));
- } else {
- return TaggedUnion.two(twoSerializer.deserialize(source));
- }
- }
-
- @Override
- public void copy(DataInputView source, DataOutputView target) throws IOException {
- byte tag = source.readByte();
- target.writeByte(tag);
- if (tag == 1) {
- oneSerializer.copy(source, target);
- } else {
- twoSerializer.copy(source, target);
- }
- }
-
- @Override
- public int hashCode() {
- return 31 * oneSerializer.hashCode() + twoSerializer.hashCode();
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public boolean equals(Object obj) {
- if (obj instanceof UnionSerializer) {
- UnionSerializer<T1, T2> other = (UnionSerializer<T1, T2>) obj;
-
- return other.canEqual(this) && oneSerializer.equals(other.oneSerializer) && twoSerializer.equals(other.twoSerializer);
- } else {
- return false;
- }
- }
-
- @Override
- public boolean canEqual(Object obj) {
- return obj instanceof UnionSerializer;
- }
- }
-
- // ------------------------------------------------------------------------
- // Utility functions that implement the CoGroup logic based on the tagged
- // untion window reduce
- // ------------------------------------------------------------------------
-
- private static class Input1Tagger<T1, T2> implements MapFunction<T1, TaggedUnion<T1, T2>> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public TaggedUnion<T1, T2> map(T1 value) throws Exception {
- return TaggedUnion.one(value);
- }
- }
-
- private static class Input2Tagger<T1, T2> implements MapFunction<T2, TaggedUnion<T1, T2>> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public TaggedUnion<T1, T2> map(T2 value) throws Exception {
- return TaggedUnion.two(value);
- }
- }
-
- private static class UnionKeySelector<T1, T2, KEY> implements KeySelector<TaggedUnion<T1, T2>, KEY> {
- private static final long serialVersionUID = 1L;
-
- private final KeySelector<T1, KEY> keySelector1;
- private final KeySelector<T2, KEY> keySelector2;
-
- public UnionKeySelector(KeySelector<T1, KEY> keySelector1,
- KeySelector<T2, KEY> keySelector2) {
- this.keySelector1 = keySelector1;
- this.keySelector2 = keySelector2;
- }
-
- @Override
- public KEY getKey(TaggedUnion<T1, T2> value) throws Exception{
- if (value.isOne()) {
- return keySelector1.getKey(value.getOne());
- } else {
- return keySelector2.getKey(value.getTwo());
- }
- }
- }
-
- private static class CoGroupWindowFunction<T1, T2, T, KEY, W extends Window>
- extends WrappingFunction<CoGroupFunction<T1, T2, T>>
- implements WindowFunction<TaggedUnion<T1, T2>, T, KEY, W> {
-
- private static final long serialVersionUID = 1L;
-
- public CoGroupWindowFunction(CoGroupFunction<T1, T2, T> userFunction) {
- super(userFunction);
- }
-
- @Override
- public void apply(KEY key,
- W window,
- Iterable<TaggedUnion<T1, T2>> values,
- Collector<T> out) throws Exception {
-
- List<T1> oneValues = new ArrayList<>();
- List<T2> twoValues = new ArrayList<>();
-
- for (TaggedUnion<T1, T2> val: values) {
- if (val.isOne()) {
- oneValues.add(val.getOne());
- } else {
- twoValues.add(val.getTwo());
- }
- }
- wrappedFunction.coGroup(oneValues, twoValues, out);
- }
- }
-}