You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:03:32 UTC
[16/51] [partial] flink git commit: [FLINK-2877] Move Streaming API
out of Staging package
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/join/WindowJoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/join/WindowJoinITCase.java b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/join/WindowJoinITCase.java
deleted file mode 100644
index 08ce890..0000000
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/join/WindowJoinITCase.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.test.exampleScalaPrograms.join;
-
-import org.apache.flink.streaming.scala.examples.join.WindowJoin;
-import org.apache.flink.streaming.examples.join.util.WindowJoinData;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
-
-public class WindowJoinITCase extends StreamingProgramTestBase {
-
- protected String gradesPath;
- protected String salariesPath;
- protected String resultPath;
-
- @Override
- protected void preSubmit() throws Exception {
- gradesPath = createTempFile("gradesText.txt", WindowJoinData.GRADES_INPUT);
- salariesPath = createTempFile("salariesText.txt", WindowJoinData.SALARIES_INPUT);
- resultPath = getTempDirPath("result");
- }
-
- @Override
- protected void postSubmit() throws Exception {
- // since the two sides of the join might have different speed
- // the exact output can not be checked just whether it is well-formed
- // checks that the result lines look like e.g. Person(bob, 2, 2015)
- checkLinesAgainstRegexp(resultPath, "^Person\\([a-z]+,(\\d),(\\d)+\\)");
- }
-
- @Override
- protected void testProgram() throws Exception {
- WindowJoin.main(new String[]{gradesPath, salariesPath, resultPath});
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/socket/SocketTextStreamWordCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/socket/SocketTextStreamWordCountITCase.java b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/socket/SocketTextStreamWordCountITCase.java
deleted file mode 100644
index b3629ad..0000000
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/socket/SocketTextStreamWordCountITCase.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.test.exampleScalaPrograms.socket;
-
-import org.apache.flink.streaming.scala.examples.socket.SocketTextStreamWordCount;
-import org.apache.flink.streaming.util.SocketProgramITCaseBase;
-
-public class SocketTextStreamWordCountITCase extends SocketProgramITCaseBase {
-
- @Override
- protected void testProgram() throws Exception {
- SocketTextStreamWordCount.main(new String[]{HOST, port.toString(), resultPath});
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/windowing/TopSpeedWindowingExampleITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/windowing/TopSpeedWindowingExampleITCase.java b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/windowing/TopSpeedWindowingExampleITCase.java
deleted file mode 100644
index ef4e47f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/windowing/TopSpeedWindowingExampleITCase.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.test.exampleScalaPrograms.windowing;
-
-import org.apache.flink.streaming.scala.examples.windowing.TopSpeedWindowing;
-import org.apache.flink.streaming.examples.windowing.util.TopSpeedWindowingExampleData;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
-
-public class TopSpeedWindowingExampleITCase extends StreamingProgramTestBase {
- protected String textPath;
- protected String resultPath;
-
- @Override
- protected void preSubmit() throws Exception {
- setParallelism(1); //needed to ensure total ordering for windows
- textPath = createTempFile("text.txt", TopSpeedWindowingExampleData.CAR_DATA);
- resultPath = getTempDirPath("result");
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(TopSpeedWindowingExampleData.TOP_CASE_CLASS_SPEEDS, resultPath);
- }
-
- @Override
- protected void testProgram() throws Exception {
- TopSpeedWindowing.main(new String[]{textPath, resultPath});
-
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-scala/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/pom.xml b/flink-staging/flink-streaming/flink-streaming-scala/pom.xml
deleted file mode 100644
index aa0d67e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-scala/pom.xml
+++ /dev/null
@@ -1,236 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-parent</artifactId>
- <version>0.10-SNAPSHOT</version>
- <relativePath>..</relativePath>
- </parent>
-
- <artifactId>flink-streaming-scala</artifactId>
- <name>flink-streaming-scala</name>
- <packaging>jar</packaging>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-core</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-scala</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-reflect</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-compiler</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.ow2.asm</groupId>
- <artifactId>asm</artifactId>
- <version>${asm.version}</version>
- </dependency>
-
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- <version>${guava.version}</version>
- </dependency>
-
- <!-- To access general test utils -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-tests</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- <type>test-jar</type>
- </dependency>
-
- <!-- To access test data -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-test-utils</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
-
- <!-- To access streaming test utils -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-core</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- <type>test-jar</type>
- </dependency>
-
- </dependencies>
-
- <build>
- <plugins>
- <!-- Scala Compiler -->
- <plugin>
- <groupId>net.alchim31.maven</groupId>
- <artifactId>scala-maven-plugin</artifactId>
- <version>3.1.4</version>
- <executions>
- <!-- Run scala compiler in the process-resources phase, so that dependencies on
- scala classes can be resolved later in the (Java) compile phase -->
- <execution>
- <id>scala-compile-first</id>
- <phase>process-resources</phase>
- <goals>
- <goal>compile</goal>
- </goals>
- </execution>
-
- <!-- Run scala compiler in the process-test-resources phase, so that dependencies on
- scala classes can be resolved later in the (Java) test-compile phase -->
- <execution>
- <id>scala-test-compile</id>
- <phase>process-test-resources</phase>
- <goals>
- <goal>testCompile</goal>
- </goals>
- </execution>
- </executions>
- <configuration>
- <jvmArgs>
- <jvmArg>-Xms128m</jvmArg>
- <jvmArg>-Xmx512m</jvmArg>
- </jvmArgs>
- <compilerPlugins combine.children="append">
- <compilerPlugin>
- <groupId>org.scalamacros</groupId>
- <artifactId>paradise_${scala.version}</artifactId>
- <version>${scala.macros.version}</version>
- </compilerPlugin>
- </compilerPlugins>
- </configuration>
- </plugin>
-
- <!-- Eclipse Integration -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-eclipse-plugin</artifactId>
- <version>2.8</version>
- <configuration>
- <downloadSources>true</downloadSources>
- <projectnatures>
- <projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
- <projectnature>org.eclipse.jdt.core.javanature</projectnature>
- </projectnatures>
- <buildcommands>
- <buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
- </buildcommands>
- <classpathContainers>
- <classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
- <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
- </classpathContainers>
- <excludes>
- <exclude>org.scala-lang:scala-library</exclude>
- <exclude>org.scala-lang:scala-compiler</exclude>
- </excludes>
- <sourceIncludes>
- <sourceInclude>**/*.scala</sourceInclude>
- <sourceInclude>**/*.java</sourceInclude>
- </sourceIncludes>
- </configuration>
- </plugin>
-
- <!-- Adding scala source directories to build path -->
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>build-helper-maven-plugin</artifactId>
- <version>1.7</version>
- <executions>
- <!-- Add src/main/scala to eclipse build path -->
- <execution>
- <id>add-source</id>
- <phase>generate-sources</phase>
- <goals>
- <goal>add-source</goal>
- </goals>
- <configuration>
- <sources>
- <source>src/main/scala</source>
- </sources>
- </configuration>
- </execution>
- <!-- Add src/test/scala to eclipse build path -->
- <execution>
- <id>add-test-source</id>
- <phase>generate-test-sources</phase>
- <goals>
- <goal>add-test-source</goal>
- </goals>
- <configuration>
- <sources>
- <source>src/test/scala</source>
- </sources>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
- <groupId>org.scalastyle</groupId>
- <artifactId>scalastyle-maven-plugin</artifactId>
- <version>0.5.0</version>
- <executions>
- <execution>
- <goals>
- <goal>check</goal>
- </goals>
- </execution>
- </executions>
- <configuration>
- <verbose>false</verbose>
- <failOnViolation>true</failOnViolation>
- <includeTestSourceDirectory>true</includeTestSourceDirectory>
- <failOnWarning>false</failOnWarning>
- <sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
- <testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
- <configLocation>${project.basedir}/../../../tools/maven/scalastyle-config.xml</configLocation>
- <outputFile>${project.basedir}/scalastyle-output.xml</outputFile>
- <outputEncoding>UTF-8</outputEncoding>
- </configuration>
- </plugin>
- </plugins>
- </build>
-
-</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
deleted file mode 100644
index 0357144..0000000
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
+++ /dev/null
@@ -1,364 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.scala
-
-import org.apache.flink.api.common.functions.{FoldFunction, ReduceFunction}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.streaming.api.datastream.{AllWindowedStream => JavaAllWStream}
-import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
-import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, SumAggregator}
-import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction
-import org.apache.flink.streaming.api.windowing.evictors.Evictor
-import org.apache.flink.streaming.api.windowing.triggers.Trigger
-import org.apache.flink.streaming.api.windowing.windows.Window
-import org.apache.flink.util.Collector
-
-import scala.reflect.ClassTag
-
-import scala.collection.JavaConverters._
-
-/**
- * A [[AllWindowedStream]] represents a data stream where the stream of
- * elements is split into windows based on a
- * [[org.apache.flink.streaming.api.windowing.assigners.WindowAssigner]]. Window emission
- * is triggered based on a [[Trigger]].
- *
- * If an [[Evictor]] is specified it will be
- * used to evict elements from the window after
- * evaluation was triggered by the [[Trigger]] but before the actual evaluation of the window.
- * When using an evictor window performance will degrade significantly, since
- * pre-aggregation of window results cannot be used.
- *
- * Note that the [[AllWindowedStream()]] is purely and API construct, during runtime
- * the [[AllWindowedStream()]] will be collapsed together with the
- * operation over the window into one single operation.
- *
- * @tparam T The type of elements in the stream.
- * @tparam W The type of [[Window]] that the
- * [[org.apache.flink.streaming.api.windowing.assigners.WindowAssigner]]
- * assigns the elements to.
- */
-class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
-
- /**
- * Sets the [[Trigger]] that should be used to trigger window emission.
- */
- def trigger(trigger: Trigger[_ >: T, _ >: W]): AllWindowedStream[T, W] = {
- javaStream.trigger(trigger)
- this
- }
-
- /**
- * Sets the [[Evictor]] that should be used to evict elements from a window before emission.
- *
- * Note: When using an evictor window performance will degrade significantly, since
- * pre-aggregation of window results cannot be used.
- */
- def evictor(evictor: Evictor[_ >: T, _ >: W]): AllWindowedStream[T, W] = {
- javaStream.evictor(evictor)
- this
- }
-
- // ------------------------------------------------------------------------
- // Operations on the keyed windows
- // ------------------------------------------------------------------------
-
- /**
- * Applies a reduce function to the window. The window function is called for each evaluation
- * of the window for each key individually. The output of the reduce function is interpreted
- * as a regular non-windowed stream.
- *
- * This window will try and pre-aggregate data as much as the window policies permit. For example,
- * tumbling time windows can perfectly pre-aggregate the data, meaning that only one element per
- * key is stored. Sliding time windows will pre-aggregate on the granularity of the slide
- * interval, so a few elements are stored per key (one per slide interval).
- * Custom windows may not be able to pre-aggregate, or may need to store extra values in an
- * aggregation tree.
- *
- * @param function The reduce function.
- * @return The data stream that is the result of applying the reduce function to the window.
- */
- def reduce(function: ReduceFunction[T]): DataStream[T] = {
- javaStream.reduce(clean(function))
- }
-
- /**
- * Applies a reduce function to the window. The window function is called for each evaluation
- * of the window for each key individually. The output of the reduce function is interpreted
- * as a regular non-windowed stream.
- *
- * This window will try and pre-aggregate data as much as the window policies permit. For example,
- * tumbling time windows can perfectly pre-aggregate the data, meaning that only one element per
- * key is stored. Sliding time windows will pre-aggregate on the granularity of the slide
- * interval, so a few elements are stored per key (one per slide interval).
- * Custom windows may not be able to pre-aggregate, or may need to store extra values in an
- * aggregation tree.
- *
- * @param function The reduce function.
- * @return The data stream that is the result of applying the reduce function to the window.
- */
- def reduce(function: (T, T) => T): DataStream[T] = {
- if (function == null) {
- throw new NullPointerException("Reduce function must not be null.")
- }
- val cleanFun = clean(function)
- val reducer = new ReduceFunction[T] {
- def reduce(v1: T, v2: T) = { cleanFun(v1, v2) }
- }
- reduce(reducer)
- }
-
- /**
- * Applies the given fold function to each window. The window function is called for each
- * evaluation of the window for each key individually. The output of the reduce function is
- * interpreted as a regular non-windowed stream.
- *
- * @param function The fold function.
- * @return The data stream that is the result of applying the fold function to the window.
- */
- def fold[R: TypeInformation: ClassTag](
- initialValue: R,
- function: FoldFunction[T,R]): DataStream[R] = {
- if (function == null) {
- throw new NullPointerException("Fold function must not be null.")
- }
-
- val resultType : TypeInformation[R] = implicitly[TypeInformation[R]]
-
- javaStream.fold(initialValue, function, resultType)
- }
-
- /**
- * Applies the given fold function to each window. The window function is called for each
- * evaluation of the window for each key individually. The output of the reduce function is
- * interpreted as a regular non-windowed stream.
- *
- * @param function The fold function.
- * @return The data stream that is the result of applying the fold function to the window.
- */
- def fold[R: TypeInformation: ClassTag](initialValue: R, function: (R, T) => R): DataStream[R] = {
- if (function == null) {
- throw new NullPointerException("Fold function must not be null.")
- }
- val cleanFun = clean(function)
- val folder = new FoldFunction[T,R] {
- def fold(acc: R, v: T) = {
- cleanFun(acc, v)
- }
- }
- fold(initialValue, folder)
- }
-
- /**
- * Applies the given window function to each window. The window function is called for each
- * evaluation of the window for each key individually. The output of the window function is
- * interpreted as a regular non-windowed stream.
- *
- * Not that this function requires that all data in the windows is buffered until the window
- * is evaluated, as the function provides no means of pre-aggregation.
- *
- * @param function The window function.
- * @return The data stream that is the result of applying the window function to the window.
- */
- def apply[R: TypeInformation: ClassTag](function: AllWindowFunction[T, R, W]): DataStream[R] = {
- javaStream.apply(clean(function), implicitly[TypeInformation[R]])
- }
-
- /**
- * Applies the given window function to each window. The window function is called for each
- * evaluation of the window for each key individually. The output of the window function is
- * interpreted as a regular non-windowed stream.
- *
- * Not that this function requires that all data in the windows is buffered until the window
- * is evaluated, as the function provides no means of pre-aggregation.
- *
- * @param function The window function.
- * @return The data stream that is the result of applying the window function to the window.
- */
- def apply[R: TypeInformation: ClassTag](
- function: (W, Iterable[T], Collector[R]) => Unit): DataStream[R] = {
- val cleanedFunction = clean(function)
- val applyFunction = new AllWindowFunction[T, R, W] {
- def apply(window: W, elements: java.lang.Iterable[T], out: Collector[R]): Unit = {
- cleanedFunction(window, elements.asScala, out)
- }
- }
- javaStream.apply(applyFunction, implicitly[TypeInformation[R]])
- }
-
- /**
- * Applies the given window function to each window. The window function is called for each
- * evaluation of the window for each key individually. The output of the window function is
- * interpreted as a regular non-windowed stream.
- *
- * Arriving data is pre-aggregated using the given pre-aggregation reducer.
- *
- * @param preAggregator The reduce function that is used for pre-aggregation
- * @param function The window function.
- * @return The data stream that is the result of applying the window function to the window.
- */
- def apply[R: TypeInformation: ClassTag](
- preAggregator: ReduceFunction[T],
- function: AllWindowFunction[T, R, W]): DataStream[R] = {
- javaStream.apply(clean(preAggregator), clean(function), implicitly[TypeInformation[R]])
- }
-
- /**
- * Applies the given window function to each window. The window function is called for each
- * evaluation of the window for each key individually. The output of the window function is
- * interpreted as a regular non-windowed stream.
- *
- * Arriving data is pre-aggregated using the given pre-aggregation reducer.
- *
- * @param preAggregator The reduce function that is used for pre-aggregation
- * @param function The window function.
- * @return The data stream that is the result of applying the window function to the window.
- */
- def apply[R: TypeInformation: ClassTag](
- preAggregator: (T, T) => T,
- function: (W, Iterable[T], Collector[R]) => Unit): DataStream[R] = {
- if (function == null) {
- throw new NullPointerException("Reduce function must not be null.")
- }
- if (function == null) {
- throw new NullPointerException("WindowApply function must not be null.")
- }
-
- val cleanReducer = clean(preAggregator)
- val reducer = new ReduceFunction[T] {
- def reduce(v1: T, v2: T) = { cleanReducer(v1, v2) }
- }
-
- val cleanApply = clean(function)
- val applyFunction = new AllWindowFunction[T, R, W] {
- def apply(window: W, elements: java.lang.Iterable[T], out: Collector[R]): Unit = {
- cleanApply(window, elements.asScala, out)
- }
- }
- javaStream.apply(reducer, applyFunction, implicitly[TypeInformation[R]])
- }
-
- // ------------------------------------------------------------------------
- // Aggregations on the keyed windows
- // ------------------------------------------------------------------------
-
- /**
- * Applies an aggregation that that gives the maximum of the elements in the window at
- * the given position.
- */
- def max(position: Int): DataStream[T] = aggregate(AggregationType.MAX, position)
-
- /**
- * Applies an aggregation that that gives the maximum of the elements in the window at
- * the given field.
- */
- def max(field: String): DataStream[T] = aggregate(AggregationType.MAX, field)
-
- /**
- * Applies an aggregation that that gives the minimum of the elements in the window at
- * the given position.
- */
- def min(position: Int): DataStream[T] = aggregate(AggregationType.MIN, position)
-
- /**
- * Applies an aggregation that that gives the minimum of the elements in the window at
- * the given field.
- */
- def min(field: String): DataStream[T] = aggregate(AggregationType.MIN, field)
-
- /**
- * Applies an aggregation that sums the elements in the window at the given position.
- */
- def sum(position: Int): DataStream[T] = aggregate(AggregationType.SUM, position)
-
- /**
- * Applies an aggregation that sums the elements in the window at the given field.
- */
- def sum(field: String): DataStream[T] = aggregate(AggregationType.SUM, field)
-
- /**
- * Applies an aggregation that that gives the maximum element of the window by
- * the given position. When equality, returns the first.
- */
- def maxBy(position: Int): DataStream[T] = aggregate(AggregationType.MAXBY,
- position)
-
- /**
- * Applies an aggregation that that gives the maximum element of the window by
- * the given field. When equality, returns the first.
- */
- def maxBy(field: String): DataStream[T] = aggregate(AggregationType.MAXBY,
- field)
-
- /**
- * Applies an aggregation that that gives the minimum element of the window by
- * the given position. When equality, returns the first.
- */
- def minBy(position: Int): DataStream[T] = aggregate(AggregationType.MINBY,
- position)
-
- /**
- * Applies an aggregation that that gives the minimum element of the window by
- * the given field. When equality, returns the first.
- */
- def minBy(field: String): DataStream[T] = aggregate(AggregationType.MINBY,
- field)
-
- private def aggregate(aggregationType: AggregationType, field: String): DataStream[T] = {
- val position = fieldNames2Indices(getInputType(), Array(field))(0)
- aggregate(aggregationType, position)
- }
-
- def aggregate(aggregationType: AggregationType, position: Int): DataStream[T] = {
-
- val jStream = javaStream.asInstanceOf[JavaAllWStream[Product, W]]
-
- val reducer = aggregationType match {
- case AggregationType.SUM =>
- new SumAggregator(position, jStream.getInputType, jStream.getExecutionEnvironment.getConfig)
-
- case _ =>
- new ComparableAggregator(
- position,
- jStream.getInputType,
- aggregationType,
- true,
- jStream.getExecutionEnvironment.getConfig)
- }
-
- new DataStream[Product](jStream.reduce(reducer)).asInstanceOf[DataStream[T]]
- }
-
- // ------------------------------------------------------------------------
- // Utilities
- // ------------------------------------------------------------------------
-
- /**
- * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
- * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]].
- */
- private[flink] def clean[F <: AnyRef](f: F): F = {
- new StreamExecutionEnvironment(javaStream.getExecutionEnvironment).scalaClean(f)
- }
-
- /**
- * Gets the output type.
- */
- private def getInputType(): TypeInformation[T] = javaStream.getInputType
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
deleted file mode 100644
index e676f81..0000000
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
+++ /dev/null
@@ -1,303 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.scala
-
-import org.apache.flink.api.common.functions.CoGroupFunction
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.functions.KeySelector
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable
-import org.apache.flink.streaming.api.datastream.{CoGroupedStreams => JavaCoGroupedStreams}
-import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner
-import org.apache.flink.streaming.api.windowing.evictors.Evictor
-import org.apache.flink.streaming.api.windowing.triggers.Trigger
-import org.apache.flink.streaming.api.windowing.windows.Window
-import org.apache.flink.util.Collector
-
-import scala.collection.JavaConverters._
-
-import scala.reflect.ClassTag
-
-/**
- * `CoGroupedStreams` represents two [[DataStream]]s that have been co-grouped.
- * A streaming co-group operation is evaluated over elements in a window.
- *
- * To finalize the co-group operation you also need to specify a [[KeySelector]] for
- * both the first and second input and a [[WindowAssigner]]
- *
- * Note: Right now, the groups are being built in memory so you need to ensure that they don't
- * get too big. Otherwise the JVM might crash.
- *
- * Example:
- *
- * {{{
- * val one: DataStream[(String, Int)] = ...
- * val two: DataStream[(String, Int)] = ...
- *
- * val result = one.coGroup(two)
- * .where(new MyFirstKeySelector())
- * .equalTo(new MyFirstKeySelector())
- * .window(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
- * .apply(new MyCoGroupFunction())
- * } }}}
- */
-object CoGroupedStreams {
-
- /**
- * A co-group operation that does not yet have its [[KeySelector]]s defined.
- *
- * @tparam T1 Type of the elements from the first input
- * @tparam T2 Type of the elements from the second input
- */
- class Unspecified[T1, T2](input1: DataStream[T1], input2: DataStream[T2]) {
-
- /**
- * Specifies a [[KeySelector]] for elements from the first input.
- */
- def where[KEY: TypeInformation](keySelector: T1 => KEY): WithKey[T1, T2, KEY] = {
- val cleanFun = clean(keySelector)
- val keyType = implicitly[TypeInformation[KEY]]
- val javaSelector = new KeySelector[T1, KEY] with ResultTypeQueryable[KEY] {
- def getKey(in: T1) = cleanFun(in)
- override def getProducedType: TypeInformation[KEY] = keyType
- }
- new WithKey[T1, T2, KEY](input1, input2, javaSelector, null, keyType)
- }
-
- /**
- * Specifies a [[KeySelector]] for elements from the second input.
- */
- def equalTo[KEY: TypeInformation](keySelector: T2 => KEY): WithKey[T1, T2, KEY] = {
- val cleanFun = clean(keySelector)
- val keyType = implicitly[TypeInformation[KEY]]
- val javaSelector = new KeySelector[T2, KEY] with ResultTypeQueryable[KEY] {
- def getKey(in: T2) = cleanFun(in)
- override def getProducedType: TypeInformation[KEY] = keyType
- }
- new WithKey[T1, T2, KEY](input1, input2, null, javaSelector, keyType)
- }
-
- /**
- * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
- * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]].
- */
- private[flink] def clean[F <: AnyRef](f: F): F = {
- new StreamExecutionEnvironment(input1.getJavaStream.getExecutionEnvironment).scalaClean(f)
- }
- }
-
- /**
- * A co-group operation that has [[KeySelector]]s defined for either both or
- * one input.
- *
- * You need to specify a [[KeySelector]] for both inputs using [[where()]] and [[equalTo()]]
- * before you can proceeed with specifying a [[WindowAssigner]] using [[window()]].
- *
- * @tparam T1 Type of the elements from the first input
- * @tparam T2 Type of the elements from the second input
- * @tparam KEY Type of the key. This must be the same for both inputs
- */
- class WithKey[T1, T2, KEY](
- input1: DataStream[T1],
- input2: DataStream[T2],
- keySelector1: KeySelector[T1, KEY],
- keySelector2: KeySelector[T2, KEY],
- keyType: TypeInformation[KEY]) {
-
- /**
- * Specifies a [[KeySelector]] for elements from the first input.
- */
- def where(keySelector: T1 => KEY): CoGroupedStreams.WithKey[T1, T2, KEY] = {
- val cleanFun = clean(keySelector)
- val localKeyType = keyType
- val javaSelector = new KeySelector[T1, KEY] with ResultTypeQueryable[KEY] {
- def getKey(in: T1) = cleanFun(in)
- override def getProducedType: TypeInformation[KEY] = localKeyType
- }
- new WithKey[T1, T2, KEY](input1, input2, javaSelector, keySelector2, keyType)
- }
-
- /**
- * Specifies a [[KeySelector]] for elements from the second input.
- */
- def equalTo(keySelector: T2 => KEY): CoGroupedStreams.WithKey[T1, T2, KEY] = {
- val cleanFun = clean(keySelector)
- val localKeyType = keyType
- val javaSelector = new KeySelector[T2, KEY] with ResultTypeQueryable[KEY] {
- def getKey(in: T2) = cleanFun(in)
- override def getProducedType: TypeInformation[KEY] = localKeyType
- }
- new WithKey[T1, T2, KEY](input1, input2, keySelector1, javaSelector, keyType)
- }
-
- /**
- * Specifies the window on which the co-group operation works.
- */
- def window[W <: Window](
- assigner: WindowAssigner[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], W])
- : CoGroupedStreams.WithWindow[T1, T2, KEY, W] = {
- if (keySelector1 == null || keySelector2 == null) {
- throw new UnsupportedOperationException("You first need to specify KeySelectors for both" +
- "inputs using where() and equalTo().")
- }
- new CoGroupedStreams.WithWindow[T1, T2, KEY, W](
- input1,
- input2,
- keySelector1,
- keySelector2,
- clean(assigner),
- null,
- null)
- }
-
- /**
- * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
- * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]].
- */
- private[flink] def clean[F <: AnyRef](f: F): F = {
- new StreamExecutionEnvironment(input1.getJavaStream.getExecutionEnvironment).scalaClean(f)
- }
- }
-
- /**
- * A co-group operation that has [[KeySelector]]s defined for both inputs as
- * well as a [[WindowAssigner]].
- *
- * @tparam T1 Type of the elements from the first input
- * @tparam T2 Type of the elements from the second input
- * @tparam KEY Type of the key. This must be the same for both inputs
- * @tparam W Type of { @link Window} on which the co-group operation works.
- */
- class WithWindow[T1, T2, KEY, W <: Window](
- input1: DataStream[T1],
- input2: DataStream[T2],
- keySelector1: KeySelector[T1, KEY],
- keySelector2: KeySelector[T2, KEY],
- windowAssigner: WindowAssigner[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], W],
- trigger: Trigger[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W],
- evictor: Evictor[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W]) {
-
-
- /**
- * Sets the [[Trigger]] that should be used to trigger window emission.
- */
- def trigger(newTrigger: Trigger[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W])
- : CoGroupedStreams.WithWindow[T1, T2, KEY, W] = {
- new WithWindow[T1, T2, KEY, W](
- input1,
- input2,
- keySelector1,
- keySelector2,
- windowAssigner,
- newTrigger,
- evictor)
- }
-
- /**
- * Sets the [[Evictor]] that should be used to evict elements from a window before emission.
- *
- * Note: When using an evictor window performance will degrade significantly, since
- * pre-aggregation of window results cannot be used.
- */
- def evictor(newEvictor: Evictor[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W])
- : CoGroupedStreams.WithWindow[T1, T2, KEY, W] = {
- new WithWindow[T1, T2, KEY, W](
- input1,
- input2,
- keySelector1,
- keySelector2,
- windowAssigner,
- trigger,
- newEvictor)
- }
-
- /**
- * Completes the co-group operation with the user function that is executed
- * for windowed groups.
- */
- def apply[O: TypeInformation: ClassTag](
- fun: (Iterator[T1], Iterator[T2]) => O): DataStream[O] = {
- require(fun != null, "CoGroup function must not be null.")
-
- val coGrouper = new CoGroupFunction[T1, T2, O] {
- val cleanFun = clean(fun)
- def coGroup(
- left: java.lang.Iterable[T1],
- right: java.lang.Iterable[T2], out: Collector[O]) = {
- out.collect(cleanFun(left.iterator().asScala, right.iterator().asScala))
- }
- }
- apply(coGrouper)
- }
-
- /**
- * Completes the co-group operation with the user function that is executed
- * for windowed groups.
- */
- def apply[O: TypeInformation: ClassTag](
- fun: (Iterator[T1], Iterator[T2], Collector[O]) => Unit): DataStream[O] = {
- require(fun != null, "CoGroup function must not be null.")
-
- val coGrouper = new CoGroupFunction[T1, T2, O] {
- val cleanFun = clean(fun)
- def coGroup(
- left: java.lang.Iterable[T1],
- right: java.lang.Iterable[T2], out: Collector[O]) = {
- cleanFun(left.iterator.asScala, right.iterator.asScala, out)
- }
- }
- apply(coGrouper)
- }
-
- /**
- * Completes the co-group operation with the user function that is executed
- * for windowed groups.
- */
- def apply[T: TypeInformation](function: CoGroupFunction[T1, T2, T]): DataStream[T] = {
-
- val coGroup = new JavaCoGroupedStreams[T1, T2](input1.getJavaStream, input2.getJavaStream)
-
- coGroup
- .where(keySelector1)
- .equalTo(keySelector2)
- .window(windowAssigner)
- .trigger(trigger)
- .evictor(evictor)
- .apply(clean(function), implicitly[TypeInformation[T]])
- }
-
- /**
- * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
- * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]].
- */
- private[flink] def clean[F <: AnyRef](f: F): F = {
- new StreamExecutionEnvironment(input1.getJavaStream.getExecutionEnvironment).scalaClean(f)
- }
- }
-
-
- /**
- * Creates a new co-group operation from the two given inputs.
- */
- def createCoGroup[T1, T2](input1: DataStream[T1], input2: DataStream[T2])
- : CoGroupedStreams.Unspecified[T1, T2] = {
- new CoGroupedStreams.Unspecified[T1, T2](input1, input2)
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
deleted file mode 100644
index 3ff773f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
+++ /dev/null
@@ -1,364 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.scala
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.functions.KeySelector
-import org.apache.flink.api.java.typeutils.{TypeExtractor, ResultTypeQueryable}
-import org.apache.flink.streaming.api.datastream.{ConnectedStreams => JavaCStream, DataStream => JavaStream, KeyedStream => JKeyedStream}
-import org.apache.flink.streaming.api.functions.co.{CoFlatMapFunction, CoMapFunction}
-import org.apache.flink.util.Collector
-import scala.reflect.ClassTag
-
-/**
- * [[ConnectedStreams]] represents two connected streams of (possible) different data types. It
- * can be used to apply transformations such as [[CoMapFunction]] on two
- * [[DataStream]]s.
- */
-class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
-
- /**
- * Applies a CoMap transformation on a {@link ConnectedStreams} and maps
- * the output to a common type. The transformation calls a
- * @param fun1 for each element of the first input and
- * @param fun2 for each element of the second input. Each
- * CoMapFunction call returns exactly one element.
- *
- * The CoMapFunction used to jointly transform the two input
- * DataStreams
- * @return The transformed { @link DataStream}
- */
- def map[R: TypeInformation: ClassTag](fun1: IN1 => R, fun2: IN2 => R):
- DataStream[R] = {
- if (fun1 == null || fun2 == null) {
- throw new NullPointerException("Map function must not be null.")
- }
- val cleanFun1 = clean(fun1)
- val cleanFun2 = clean(fun2)
- val comapper = new CoMapFunction[IN1, IN2, R] {
- def map1(in1: IN1): R = cleanFun1(in1)
- def map2(in2: IN2): R = cleanFun2(in2)
- }
-
- map(comapper)
- }
-
- /**
- * Applies a CoMap transformation on a {@link ConnectedStreams} and maps
- * the output to a common type. The transformation calls a
- * {@link CoMapFunction#map1} for each element of the first input and
- * {@link CoMapFunction#map2} for each element of the second input. Each
- * CoMapFunction call returns exactly one element. The user can also extend
- * {@link RichCoMapFunction} to gain access to other features provided by
- * the {@link RichFuntion} interface.
- *
- * @param coMapper
- * The CoMapFunction used to jointly transform the two input
- * DataStreams
- * @return The transformed { @link DataStream}
- */
- def map[R: TypeInformation: ClassTag](coMapper: CoMapFunction[IN1, IN2, R]):
- DataStream[R] = {
- if (coMapper == null) {
- throw new NullPointerException("Map function must not be null.")
- }
-
- val outType : TypeInformation[R] = implicitly[TypeInformation[R]]
- javaStream.map(coMapper).returns(outType).asInstanceOf[JavaStream[R]]
- }
-
- /**
- * Applies a CoFlatMap transformation on a {@link ConnectedStreams} and
- * maps the output to a common type. The transformation calls a
- * {@link CoFlatMapFunction#flatMap1} for each element of the first input
- * and {@link CoFlatMapFunction#flatMap2} for each element of the second
- * input. Each CoFlatMapFunction call returns any number of elements
- * including none. The user can also extend {@link RichFlatMapFunction} to
- * gain access to other features provided by the {@link RichFuntion}
- * interface.
- *
- * @param coFlatMapper
- * The CoFlatMapFunction used to jointly transform the two input
- * DataStreams
- * @return The transformed { @link DataStream}
- */
- def flatMap[R: TypeInformation: ClassTag](coFlatMapper: CoFlatMapFunction[IN1, IN2, R]):
- DataStream[R] = {
- if (coFlatMapper == null) {
- throw new NullPointerException("FlatMap function must not be null.")
- }
-
- val outType : TypeInformation[R] = implicitly[TypeInformation[R]]
- javaStream.flatMap(coFlatMapper).returns(outType).asInstanceOf[JavaStream[R]]
- }
-
- /**
- * Applies a CoFlatMap transformation on a {@link ConnectedStreams} and
- * maps the output to a common type. The transformation calls a
- * @param fun1 for each element of the first input
- * and @param fun2 for each element of the second
- * input. Each CoFlatMapFunction call returns any number of elements
- * including none.
- *
- * @return The transformed { @link DataStream}
- */
- def flatMap[R: TypeInformation: ClassTag](fun1: (IN1, Collector[R]) => Unit,
- fun2: (IN2, Collector[R]) => Unit): DataStream[R] = {
- if (fun1 == null || fun2 == null) {
- throw new NullPointerException("FlatMap functions must not be null.")
- }
- val cleanFun1 = clean(fun1)
- val cleanFun2 = clean(fun2)
- val flatMapper = new CoFlatMapFunction[IN1, IN2, R] {
- def flatMap1(value: IN1, out: Collector[R]): Unit = cleanFun1(value, out)
- def flatMap2(value: IN2, out: Collector[R]): Unit = cleanFun2(value, out)
- }
- flatMap(flatMapper)
- }
-
- /**
- * Applies a CoFlatMap transformation on a {@link ConnectedStreams} and
- * maps the output to a common type. The transformation calls a
- * @param fun1 for each element of the first input
- * and @param fun2 for each element of the second
- * input. Each CoFlatMapFunction call returns any number of elements
- * including none.
- *
- * @return The transformed { @link DataStream}
- */
- def flatMap[R: TypeInformation: ClassTag](fun1: IN1 => TraversableOnce[R],
- fun2: IN2 => TraversableOnce[R]): DataStream[R] = {
- if (fun1 == null || fun2 == null) {
- throw new NullPointerException("FlatMap functions must not be null.")
- }
- val cleanFun1 = clean(fun1)
- val cleanFun2 = clean(fun2)
- val flatMapper = new CoFlatMapFunction[IN1, IN2, R] {
- def flatMap1(value: IN1, out: Collector[R]) = { cleanFun1(value) foreach out.collect }
- def flatMap2(value: IN2, out: Collector[R]) = { cleanFun2(value) foreach out.collect }
- }
- flatMap(flatMapper)
- }
-
- /**
- * GroupBy operation for connected data stream. Groups the elements of
- * input1 and input2 according to keyPosition1 and keyPosition2. Used for
- * applying function on grouped data streams for example
- * {@link ConnectedStreams#reduce}
- *
- * @param keyPosition1
- * The field used to compute the hashcode of the elements in the
- * first input stream.
- * @param keyPosition2
- * The field used to compute the hashcode of the elements in the
- * second input stream.
- * @return @return The transformed { @link ConnectedStreams}
- */
- def keyBy(keyPosition1: Int, keyPosition2: Int): ConnectedStreams[IN1, IN2] = {
- javaStream.keyBy(keyPosition1, keyPosition2)
- }
-
- /**
- * GroupBy operation for connected data stream. Groups the elements of
- * input1 and input2 according to keyPositions1 and keyPositions2. Used for
- * applying function on grouped data streams for example
- * {@link ConnectedStreams#reduce}
- *
- * @param keyPositions1
- * The fields used to group the first input stream.
- * @param keyPositions2
- * The fields used to group the second input stream.
- * @return @return The transformed { @link ConnectedStreams}
- */
- def keyBy(keyPositions1: Array[Int], keyPositions2: Array[Int]):
- ConnectedStreams[IN1, IN2] = {
- javaStream.keyBy(keyPositions1, keyPositions2)
- }
-
- /**
- * GroupBy operation for connected data stream using key expressions. Groups
- * the elements of input1 and input2 according to field1 and field2. A field
- * expression is either the name of a public field or a getter method with
- * parentheses of the {@link DataStream}S underlying type. A dot can be used
- * to drill down into objects, as in {@code "field1.getInnerField2()" }.
- *
- * @param field1
- * The grouping expression for the first input
- * @param field2
- * The grouping expression for the second input
- * @return The grouped { @link ConnectedStreams}
- */
- def keyBy(field1: String, field2: String): ConnectedStreams[IN1, IN2] = {
- javaStream.keyBy(field1, field2)
- }
-
- /**
- * GroupBy operation for connected data stream using key expressions. Groups
- * the elements of input1 and input2 according to fields1 and fields2. A
- * field expression is either the name of a public field or a getter method
- * with parentheses of the {@link DataStream}S underlying type. A dot can be
- * used to drill down into objects, as in {@code "field1.getInnerField2()" }
- * .
- *
- * @param fields1
- * The grouping expressions for the first input
- * @param fields2
- * The grouping expressions for the second input
- * @return The grouped { @link ConnectedStreams}
- */
- def keyBy(fields1: Array[String], fields2: Array[String]):
- ConnectedStreams[IN1, IN2] = {
- javaStream.keyBy(fields1, fields2)
- }
-
- /**
- * GroupBy operation for connected data stream. Groups the elements of
- * input1 and input2 using fun1 and fun2. Used for applying
- * function on grouped data streams for example
- * {@link ConnectedStreams#reduce}
- *
- * @param fun1
- * The function used for grouping the first input
- * @param fun2
- * The function used for grouping the second input
- * @return The grouped { @link ConnectedStreams}
- */
- def keyBy[K1: TypeInformation, K2: TypeInformation](fun1: IN1 => K1, fun2: IN2 => K2):
- ConnectedStreams[IN1, IN2] = {
-
- val keyType1 = implicitly[TypeInformation[K1]]
- val keyType2 = implicitly[TypeInformation[K2]]
-
- val cleanFun1 = clean(fun1)
- val cleanFun2 = clean(fun2)
-
- val keyExtractor1 = new KeySelectorWithType[IN1, K1](cleanFun1, keyType1)
- val keyExtractor2 = new KeySelectorWithType[IN2, K2](cleanFun2, keyType2)
-
- javaStream.keyBy(keyExtractor1, keyExtractor2)
- }
-
- /**
- * PartitionBy operation for connected data stream. Partitions the elements of
- * input1 and input2 according to keyPosition1 and keyPosition2.
- *
- * @param keyPosition1
- * The field used to compute the hashcode of the elements in the
- * first input stream.
- * @param keyPosition2
- * The field used to compute the hashcode of the elements in the
- * second input stream.
- * @return The transformed { @link ConnectedStreams}
- */
- def partitionByHash(keyPosition1: Int, keyPosition2: Int): ConnectedStreams[IN1, IN2] = {
- javaStream.partitionByHash(keyPosition1, keyPosition2)
- }
-
- /**
- * PartitionBy operation for connected data stream. Partitions the elements of
- * input1 and input2 according to keyPositions1 and keyPositions2.
- *
- * @param keyPositions1
- * The fields used to partition the first input stream.
- * @param keyPositions2
- * The fields used to partition the second input stream.
- * @return The transformed { @link ConnectedStreams}
- */
- def partitionByHash(keyPositions1: Array[Int], keyPositions2: Array[Int]):
- ConnectedStreams[IN1, IN2] = {
- javaStream.partitionByHash(keyPositions1, keyPositions2)
- }
-
- /**
- * PartitionBy operation for connected data stream using key expressions. Partitions
- * the elements of input1 and input2 according to field1 and field2. A field
- * expression is either the name of a public field or a getter method with
- * parentheses of the {@link DataStream}S underlying type. A dot can be used
- * to drill down into objects, as in {@code "field1.getInnerField2()" }.
- *
- * @param field1
- * The partitioning expression for the first input
- * @param field2
- * The partitioning expression for the second input
- * @return The grouped { @link ConnectedStreams}
- */
- def partitionByHash(field1: String, field2: String): ConnectedStreams[IN1, IN2] = {
- javaStream.partitionByHash(field1, field2)
- }
-
- /**
- * PartitionBy operation for connected data stream using key expressions. Partitions
- * the elements of input1 and input2 according to fields1 and fields2.
- *
- * @param fields1
- * The partitioning expressions for the first input
- * @param fields2
- * The partitioning expressions for the second input
- * @return The partitioned { @link ConnectedStreams}
- */
- def partitionByHash(fields1: Array[String], fields2: Array[String]):
- ConnectedStreams[IN1, IN2] = {
- javaStream.partitionByHash(fields1, fields2)
- }
-
- /**
- * PartitionBy operation for connected data stream. Partitions the elements of
- * input1 and input2 using fun1 and fun2.
- *
- * @param fun1
- * The function used for partitioning the first input
- * @param fun2
- * The function used for partitioning the second input
- * @return The partitioned { @link ConnectedStreams}
- */
- def partitionByHash[K: TypeInformation, L: TypeInformation](fun1: IN1 => K, fun2: IN2 => L):
- ConnectedStreams[IN1, IN2] = {
-
- val cleanFun1 = clean(fun1)
- val cleanFun2 = clean(fun2)
-
- val keyExtractor1 = new KeySelector[IN1, K] {
- def getKey(in: IN1) = cleanFun1(in)
- }
- val keyExtractor2 = new KeySelector[IN2, L] {
- def getKey(in: IN2) = cleanFun2(in)
- }
-
- javaStream.partitionByHash(keyExtractor1, keyExtractor2)
- }
-
- /**
- * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
- * is not disabled in the {@link org.apache.flink.api.common.ExecutionConfig}
- */
- private[flink] def clean[F <: AnyRef](f: F): F = {
- new StreamExecutionEnvironment(javaStream.getExecutionEnvironment).scalaClean(f)
- }
-
-}
-
-class KeySelectorWithType[IN, K](
- private[this] val fun: IN => K,
- private[this] val info: TypeInformation[K])
- extends KeySelector[IN, K] with ResultTypeQueryable[K] {
-
- override def getKey(value: IN): K = fun(value)
-
- override def getProducedType: TypeInformation[K] = info
-}