You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:03:34 UTC
[18/51] [partial] flink git commit: [FLINK-2877] Move Streaming API
out of Staging package
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/pom.xml b/flink-staging/flink-streaming/flink-streaming-examples/pom.xml
deleted file mode 100644
index 563cf01..0000000
--- a/flink-staging/flink-streaming/flink-streaming-examples/pom.xml
+++ /dev/null
@@ -1,535 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-parent</artifactId>
- <version>0.10-SNAPSHOT</version>
- <relativePath>..</relativePath>
- </parent>
-
- <artifactId>flink-streaming-examples</artifactId>
- <name>flink-streaming-examples</name>
-
- <packaging>jar</packaging>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-core</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-scala</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-java-examples</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-twitter</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-core</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- <type>test-jar</type>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-tests</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-test-utils</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
-
- </dependencies>
-
- <build>
- <plugins>
- <!-- get default data from flink-java-examples package -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-dependency-plugin</artifactId>
- <version>2.9</version><!--$NO-MVN-MAN-VER$-->
- <executions>
- <execution>
- <id>unpack</id>
- <phase>prepare-package</phase>
- <goals>
- <goal>unpack</goal>
- </goals>
- <configuration>
- <artifactItems>
- <!-- For WordCount example data -->
- <artifactItem>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-java-examples</artifactId>
- <version>${project.version}</version>
- <type>jar</type>
- <overWrite>false</overWrite>
- <outputDirectory>${project.build.directory}/classes</outputDirectory>
- <includes>org/apache/flink/examples/java/wordcount/util/WordCountData.class</includes>
- </artifactItem>
- <!-- For JSON utilities -->
- <artifactItem>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-twitter</artifactId>
- <version>${project.version}</version>
- <type>jar</type>
- <overWrite>false</overWrite>
- <outputDirectory>${project.build.directory}/classes</outputDirectory>
- <includes>org/apache/flink/streaming/connectors/json/*</includes>
- </artifactItem>
- </artifactItems>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
- <!-- self-contained jars for each example -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-jar-plugin</artifactId>
- <version>2.4</version><!--$NO-MVN-MAN-VER$-->
- <executions>
- <!-- Default Execution -->
- <execution>
- <id>default</id>
- <phase>package</phase>
- <goals>
- <goal>test-jar</goal>
- </goals>
- </execution>
-
- <!-- Iteration -->
- <execution>
- <id>Iteration</id>
- <phase>package</phase>
- <goals>
- <goal>jar</goal>
- </goals>
- <configuration>
- <classifier>Iteration</classifier>
-
- <archive>
- <manifestEntries>
- <program-class>org.apache.flink.streaming.examples.iteration.IterateExample</program-class>
- </manifestEntries>
- </archive>
-
- <includes>
- <include>org/apache/flink/streaming/examples/iteration/*.class</include>
- </includes>
- </configuration>
- </execution>
-
- <!-- IncrementalLearning -->
- <execution>
- <id>IncrementalLearning</id>
- <phase>package</phase>
- <goals>
- <goal>jar</goal>
- </goals>
- <configuration>
- <classifier>IncrementalLearning</classifier>
-
- <archive>
- <manifestEntries>
- <program-class>org.apache.flink.streaming.examples.ml.IncrementalLearningSkeleton</program-class>
- </manifestEntries>
- </archive>
-
- <includes>
- <include>org/apache/flink/streaming/examples/ml/*.class</include>
- </includes>
- </configuration>
- </execution>
-
- <!-- Twitter -->
- <execution>
- <id>Twitter</id>
- <phase>package</phase>
- <goals>
- <goal>jar</goal>
- </goals>
- <configuration>
- <classifier>Twitter</classifier>
-
- <archive>
- <manifestEntries>
- <program-class>org.apache.flink.streaming.examples.twitter.TwitterStream</program-class>
- </manifestEntries>
- </archive>
-
- <includes>
- <include>org/apache/flink/streaming/examples/twitter/*.class</include>
- <include>org/apache/flink/streaming/examples/twitter/util/*.class</include>
- <include>org/apache/flink/streaming/connectors/json/*.class</include>
- </includes>
- </configuration>
- </execution>
-
- <!-- WindowJoin -->
- <execution>
- <id>WindowJoin</id>
- <phase>package</phase>
- <goals>
- <goal>jar</goal>
- </goals>
- <configuration>
- <classifier>WindowJoin</classifier>
-
- <archive>
- <manifestEntries>
- <program-class>org.apache.flink.streaming.examples.join.WindowJoin</program-class>
- </manifestEntries>
- </archive>
-
- <includes>
- <include>org/apache/flink/streaming/examples/join/*.class</include>
- </includes>
- </configuration>
- </execution>
-
- <!-- WordCountPOJO -->
- <execution>
- <id>WordCountPOJO</id>
- <phase>package</phase>
- <goals>
- <goal>jar</goal>
- </goals>
- <configuration>
- <classifier>WordCountPOJO</classifier>
-
- <archive>
- <manifestEntries>
- <program-class>org.apache.flink.streaming.examples.wordcount.PojoExample</program-class>
- </manifestEntries>
- </archive>
-
- <includes>
- <include>org/apache/flink/streaming/examples/wordcount/PojoExample.class</include>
- <include>org/apache/flink/streaming/examples/wordcount/PojoExample$*.class</include>
- <include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include>
- </includes>
- </configuration>
- </execution>
-
- <!-- WordCount -->
- <execution>
- <id>WordCount</id>
- <phase>package</phase>
- <goals>
- <goal>jar</goal>
- </goals>
- <configuration>
- <classifier>WordCount</classifier>
-
- <archive>
- <manifestEntries>
- <program-class>org.apache.flink.streaming.examples.wordcount.WordCount</program-class>
- </manifestEntries>
- </archive>
-
- <includes>
- <include>org/apache/flink/streaming/examples/wordcount/WordCount.class</include>
- <include>org/apache/flink/streaming/examples/wordcount/WordCount$*.class</include>
- <include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include>
- </includes>
- </configuration>
- </execution>
-
- <!-- WindowWordCount -->
- <execution>
- <id>WindowWordCount</id>
- <phase>package</phase>
- <goals>
- <goal>jar</goal>
- </goals>
- <configuration>
- <classifier>WindowWordCount</classifier>
-
- <archive>
- <manifestEntries>
- <program-class>org.apache.flink.streaming.examples.windowing.WindowWordCount</program-class>
- </manifestEntries>
- </archive>
-
- <includes>
- <include>org/apache/flink/streaming/examples/windowing/WindowWordCount.class</include>
- <include>org/apache/flink/streaming/examples/wordcount/WordCount.class</include>
- <include>org/apache/flink/streaming/examples/wordcount/WordCount$*.class</include>
- <include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include>
- </includes>
- </configuration>
- </execution>
-
- <!-- SocketTextStreamWordCount -->
- <execution>
- <id>SocketTextStreamWordCount</id>
- <phase>package</phase>
- <goals>
- <goal>jar</goal>
- </goals>
- <configuration>
- <classifier>SocketTextStreamWordCount</classifier>
-
- <archive>
- <manifestEntries>
- <program-class>org.apache.flink.streaming.examples.socket.SocketTextStreamWordCount</program-class>
- </manifestEntries>
- </archive>
-
- <includes>
- <include>org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.class</include>
- <include>org/apache/flink/streaming/examples/wordcount/WordCount.class</include>
- <include>org/apache/flink/streaming/examples/wordcount/WordCount$*.class</include>
- </includes>
- </configuration>
- </execution>
-
- <!-- TopSpeedWindowing -->
- <execution>
- <id>TopSpeedWindowing</id>
- <phase>package</phase>
- <goals>
- <goal>jar</goal>
- </goals>
- <configuration>
- <classifier>TopSpeedWindowing</classifier>
-
- <archive>
- <manifestEntries>
- <program-class>org.apache.flink.streaming.examples.windowing.TopSpeedWindowing</program-class>
- </manifestEntries>
- </archive>
-
- <includes>
- <include>org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.class</include>
- <include>org/apache/flink/streaming/examples/windowing/TopSpeedWindowing$*.class</include>
- </includes>
- </configuration>
- </execution>
-
- <!-- SessionWindowing -->
- <execution>
- <id>SessionWindowing</id>
- <phase>package</phase>
- <goals>
- <goal>jar</goal>
- </goals>
- <configuration>
- <classifier>SessionWindowing</classifier>
-
- <archive>
- <manifestEntries>
- <program-class>org.apache.flink.streaming.examples.windowing.SessionWindowing</program-class>
- </manifestEntries>
- </archive>
-
- <includes>
- <include>org/apache/flink/streaming/examples/windowing/SessionWindowing.class</include>
- <include>org/apache/flink/streaming/examples/windowing/SessionWindowing$*.class</include>
- </includes>
- </configuration>
- </execution>
-
- </executions>
- </plugin>
-
-
- <!-- Scala Compiler -->
- <plugin>
- <groupId>net.alchim31.maven</groupId>
- <artifactId>scala-maven-plugin</artifactId>
- <version>3.1.4</version>
- <executions>
- <!-- Run scala compiler in the process-resources phase, so that dependencies on
- scala classes can be resolved later in the (Java) compile phase -->
- <execution>
- <id>scala-compile-first</id>
- <phase>process-resources</phase>
- <goals>
- <goal>compile</goal>
- </goals>
- </execution>
-
- <!-- Run scala compiler in the process-test-resources phase, so that dependencies on
- scala classes can be resolved later in the (Java) test-compile phase -->
- <execution>
- <id>scala-test-compile</id>
- <phase>process-test-resources</phase>
- <goals>
- <goal>testCompile</goal>
- </goals>
- </execution>
- </executions>
- <configuration>
- <jvmArgs>
- <jvmArg>-Xms128m</jvmArg>
- <jvmArg>-Xmx512m</jvmArg>
- </jvmArgs>
- </configuration>
- </plugin>
-
- <!-- Eclipse Integration -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-eclipse-plugin</artifactId>
- <version>2.8</version>
- <configuration>
- <downloadSources>true</downloadSources>
- <projectnatures>
- <projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
- <projectnature>org.eclipse.jdt.core.javanature</projectnature>
- </projectnatures>
- <buildcommands>
- <buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
- </buildcommands>
- <classpathContainers>
- <classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
- <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
- </classpathContainers>
- <excludes>
- <exclude>org.scala-lang:scala-library</exclude>
- <exclude>org.scala-lang:scala-compiler</exclude>
- </excludes>
- <sourceIncludes>
- <sourceInclude>**/*.scala</sourceInclude>
- <sourceInclude>**/*.java</sourceInclude>
- </sourceIncludes>
- </configuration>
- </plugin>
-
- <!-- Adding scala source directories to build path -->
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>build-helper-maven-plugin</artifactId>
- <version>1.7</version>
- <executions>
- <!-- Add src/main/scala to eclipse build path -->
- <execution>
- <id>add-source</id>
- <phase>generate-sources</phase>
- <goals>
- <goal>add-source</goal>
- </goals>
- <configuration>
- <sources>
- <source>src/main/scala</source>
- </sources>
- </configuration>
- </execution>
- <!-- Add src/test/scala to eclipse build path -->
- <execution>
- <id>add-test-source</id>
- <phase>generate-test-sources</phase>
- <goals>
- <goal>add-test-source</goal>
- </goals>
- <configuration>
- <sources>
- <source>src/test/scala</source>
- </sources>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
- <groupId>org.scalastyle</groupId>
- <artifactId>scalastyle-maven-plugin</artifactId>
- <version>0.5.0</version>
- <executions>
- <execution>
- <goals>
- <goal>check</goal>
- </goals>
- </execution>
- </executions>
- <configuration>
- <verbose>false</verbose>
- <failOnViolation>true</failOnViolation>
- <includeTestSourceDirectory>true</includeTestSourceDirectory>
- <failOnWarning>false</failOnWarning>
- <sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
- <testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
- <configLocation>${project.basedir}/../../../tools/maven/scalastyle-config.xml</configLocation>
- <outputFile>${project.basedir}/scalastyle-output.xml</outputFile>
- <outputEncoding>UTF-8</outputEncoding>
- </configuration>
- </plugin>
- </plugins>
-
- <pluginManagement>
- <plugins>
- <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
- <plugin>
- <groupId>org.eclipse.m2e</groupId>
- <artifactId>lifecycle-mapping</artifactId>
- <version>1.0.0</version>
- <configuration>
- <lifecycleMappingMetadata>
- <pluginExecutions>
- <pluginExecution>
- <pluginExecutionFilter>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-dependency-plugin</artifactId>
- <versionRange>[2.9,)</versionRange>
- <goals>
- <goal>unpack</goal>
- </goals>
- </pluginExecutionFilter>
- <action>
- <ignore/>
- </action>
- </pluginExecution>
- </pluginExecutions>
- </lifecycleMappingMetadata>
- </configuration>
- </plugin>
- </plugins>
- </pluginManagement>
-
- </build>
-
-</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
deleted file mode 100644
index 2cf66b9..0000000
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
+++ /dev/null
@@ -1,247 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.iteration;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.IterativeStream;
-import org.apache.flink.streaming.api.datastream.SplitStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-
-/**
- * Example illustrating iterations in Flink streaming. <p/> <p> The program sums up random numbers and counts additions
- * it performs to reach a specific threshold in an iterative streaming fashion. </p>
- * <p/>
- * <p/>
- * This example shows how to use: <ul> <li>streaming iterations, <li>buffer timeout to enhance latency, <li>directed
- * outputs. </ul>
- */
-public class IterateExample {
-
- private static final int BOUND = 100;
-
- // *************************************************************************
- // PROGRAM
- // *************************************************************************
-
- public static void main(String[] args) throws Exception {
-
- if (!parseParameters(args)) {
- return;
- }
-
- // set up input for the stream of integer pairs
-
- // obtain execution environment and set setBufferTimeout to 1 to enable
- // continuous flushing of the output buffers (lowest latency)
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
- .setBufferTimeout(1);
-
- // create input stream of integer pairs
- DataStream<Tuple2<Integer, Integer>> inputStream;
- if (fileInput) {
- inputStream = env.readTextFile(inputPath).map(new FibonacciInputMap());
- } else {
- inputStream = env.addSource(new RandomFibonacciSource());
- }
-
- // create an iterative data stream from the input with 5 second timeout
- IterativeStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> it = inputStream.map(new InputMap())
- .iterate(5000);
-
- // apply the step function to get the next Fibonacci number
- // increment the counter and split the output with the output selector
- SplitStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> step = it.map(new Step())
- .split(new MySelector());
-
- // close the iteration by selecting the tuples that were directed to the
- // 'iterate' channel in the output selector
- it.closeWith(step.select("iterate"));
-
- // to produce the final output select the tuples directed to the
- // 'output' channel then get the input pairs that have the greatest iteration counter
- // on a 1 second sliding window
- DataStream<Tuple2<Tuple2<Integer, Integer>, Integer>> numbers = step.select("output")
- .map(new OutputMap());
-
- // emit results
- if (fileOutput) {
- numbers.writeAsText(outputPath, 1);
- } else {
- numbers.print();
- }
-
- // execute the program
- env.execute("Streaming Iteration Example");
- }
-
- // *************************************************************************
- // USER FUNCTIONS
- // *************************************************************************
-
- /**
- * Generate BOUND number of random integer pairs from the range from 0 to BOUND/2
- */
- private static class RandomFibonacciSource implements SourceFunction<Tuple2<Integer, Integer>> {
- private static final long serialVersionUID = 1L;
-
- private Random rnd = new Random();
-
- private volatile boolean isRunning = true;
- private int counter = 0;
-
- @Override
- public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
-
- while (isRunning && counter < BOUND) {
- int first = rnd.nextInt(BOUND / 2 - 1) + 1;
- int second = rnd.nextInt(BOUND / 2 - 1) + 1;
-
- ctx.collect(new Tuple2<Integer, Integer>(first, second));
- counter++;
- Thread.sleep(50L);
- }
- }
-
- @Override
- public void cancel() {
- isRunning = false;
- }
- }
-
- /**
- * Generate random integer pairs from the range from 0 to BOUND/2
- */
- private static class FibonacciInputMap implements MapFunction<String, Tuple2<Integer, Integer>> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Tuple2<Integer, Integer> map(String value) throws Exception {
- String record = value.substring(1, value.length() - 1);
- String[] splitted = record.split(",");
- return new Tuple2<Integer, Integer>(Integer.parseInt(splitted[0]), Integer.parseInt(splitted[1]));
- }
- }
-
- /**
- * Map the inputs so that the next Fibonacci numbers can be calculated while preserving the original input tuple A
- * counter is attached to the tuple and incremented in every iteration step
- */
- public static class InputMap implements MapFunction<Tuple2<Integer, Integer>, Tuple5<Integer, Integer, Integer,
- Integer, Integer>> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Tuple5<Integer, Integer, Integer, Integer, Integer> map(Tuple2<Integer, Integer> value) throws
- Exception {
- return new Tuple5<Integer, Integer, Integer, Integer, Integer>(value.f0, value.f1, value.f0, value.f1, 0);
- }
- }
-
- /**
- * Iteration step function that calculates the next Fibonacci number
- */
- public static class Step implements
- MapFunction<Tuple5<Integer, Integer, Integer, Integer, Integer>, Tuple5<Integer, Integer, Integer,
- Integer, Integer>> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Tuple5<Integer, Integer, Integer, Integer, Integer> map(Tuple5<Integer, Integer, Integer, Integer,
- Integer> value) throws Exception {
- return new Tuple5<Integer, Integer, Integer, Integer, Integer>(value.f0, value.f1, value.f3, value.f2 +
- value.f3, ++value.f4);
- }
- }
-
- /**
- * OutputSelector testing which tuple needs to be iterated again.
- */
- public static class MySelector implements OutputSelector<Tuple5<Integer, Integer, Integer, Integer, Integer>> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Iterable<String> select(Tuple5<Integer, Integer, Integer, Integer, Integer> value) {
- List<String> output = new ArrayList<String>();
- if (value.f2 < BOUND && value.f3 < BOUND) {
- output.add("iterate");
- } else {
- output.add("output");
- }
- return output;
- }
- }
-
- /**
- * Giving back the input pair and the counter
- */
- public static class OutputMap implements MapFunction<Tuple5<Integer, Integer, Integer, Integer, Integer>,
- Tuple2<Tuple2<Integer, Integer>, Integer>> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Tuple2<Tuple2<Integer, Integer>, Integer> map(Tuple5<Integer, Integer, Integer, Integer, Integer>
- value) throws
- Exception {
- return new Tuple2<Tuple2<Integer, Integer>, Integer>(new Tuple2<Integer, Integer>(value.f0, value.f1),
- value.f4);
- }
- }
-
- // *************************************************************************
- // UTIL METHODS
- // *************************************************************************
-
- private static boolean fileInput = false;
- private static boolean fileOutput = false;
- private static String inputPath;
- private static String outputPath;
-
- private static boolean parseParameters(String[] args) {
-
- if (args.length > 0) {
- // parse input arguments
- if (args.length == 1) {
- fileOutput = true;
- outputPath = args[0];
- } else if (args.length == 2) {
- fileInput = true;
- inputPath = args[0];
- fileOutput = true;
- outputPath = args[1];
- } else {
- System.err.println("Usage: IterateExample <result path>");
- return false;
- }
- } else {
- System.out.println("Executing IterateExample with generated data.");
- System.out.println(" Provide parameter to write to file.");
- System.out.println(" Usage: IterateExample <result path>");
- }
- return true;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/util/IterateExampleData.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/util/IterateExampleData.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/util/IterateExampleData.java
deleted file mode 100644
index 0077459..0000000
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/util/IterateExampleData.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.iteration.util;
-
-public class IterateExampleData {
- public static final String INPUT_PAIRS = "(1,40)\n" + "(29,38)\n" + "(11,15)\n" + "(17,39)\n" + "(24,41)\n" +
- "(7,33)\n" + "(20,2)\n" + "(11,5)\n" + "(3,16)\n" + "(23,36)\n" + "(15,23)\n" + "(28,13)\n" + "(1,1)\n" +
- "(10,6)\n" + "(21,5)\n" + "(14,36)\n" + "(17,15)\n" + "(7,9)";
-
- public static final String RESULTS = "((1,40),3)\n" + "((24,41),2)\n" + "((3,16),5)\n" + "((1,1),10)\n" +
- "((17,15),4)\n" + "((29,38),2)\n" + "((7,33),3)\n" + "((23,36),3)\n" + "((10,6),6)\n" + "((7,9),5)\n" +
- "((11,15),4)\n" + "((20,2),5)\n" + "((15,23),4)\n" + "((21,5),5)\n" +
- "((17,39),3)\n" + "((11,5),6)\n" + "((28,13),4)\n" + "((14,36),3)";
-
- private IterateExampleData() {
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
deleted file mode 100644
index 3355f1c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
+++ /dev/null
@@ -1,296 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.join;
-
-import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.TimestampExtractor;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
-import org.apache.flink.streaming.api.windowing.time.Time;
-
-import java.util.Random;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Example illustrating join over sliding windows of streams in Flink.
- *
- * <p>
- * This example will join two streams with a sliding window. One which emits grades and one which
- * emits salaries of people. The input format for both sources has an additional timestamp
- * as field 0. This is used to to event-time windowing. Time timestamps must be
- * monotonically increasing.
- *
- * This example shows how to:
- * <ul>
- * <li>do windowed joins,
- * <li>use tuple data types,
- * <li>write a simple streaming program.
- * </ul>
- */
-public class WindowJoin {
-
- // *************************************************************************
- // PROGRAM
- // *************************************************************************
-
- public static void main(String[] args) throws Exception {
-
- if (!parseParameters(args)) {
- return;
- }
-
- // obtain execution environment
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-
- // connect to the data sources for grades and salaries
- Tuple2<DataStream<Tuple3<Long, String, Integer>>, DataStream<Tuple3<Long, String, Integer>>> input = getInputStreams(env);
- DataStream<Tuple3<Long, String, Integer>> grades = input.f0;
- DataStream<Tuple3<Long, String, Integer>> salaries = input.f1;
-
- // extract the timestamps
- grades = grades.assignTimestamps(new MyTimestampExtractor());
- salaries = salaries.assignTimestamps(new MyTimestampExtractor());
-
- // apply a temporal join over the two stream based on the names over one
- // second windows
- DataStream<Tuple3<String, Integer, Integer>> joinedStream = grades
- .join(salaries)
- .where(new NameKeySelector())
- .equalTo(new NameKeySelector())
- .window(TumblingTimeWindows.of(Time.of(5, TimeUnit.MILLISECONDS)))
- .apply(new MyJoinFunction());
-
- // emit result
- if (fileOutput) {
- joinedStream.writeAsText(outputPath, 1);
- } else {
- joinedStream.print();
- }
-
- // execute program
- env.execute("Windowed Join Example");
- }
-
- // *************************************************************************
- // USER FUNCTIONS
- // *************************************************************************
-
- private final static String[] names = {"tom", "jerry", "alice", "bob", "john", "grace"};
- private final static int GRADE_COUNT = 5;
- private final static int SALARY_MAX = 10000;
- private final static int SLEEP_TIME = 10;
-
- /**
- * Continuously emit tuples with random names and integers (grades).
- */
- public static class GradeSource implements SourceFunction<Tuple3<Long, String, Integer>> {
- private static final long serialVersionUID = 1L;
-
- private Random rand;
- private Tuple3<Long, String, Integer> outTuple;
- private volatile boolean isRunning = true;
- private int counter;
-
- public GradeSource() {
- rand = new Random();
- outTuple = new Tuple3<>();
- }
-
- @Override
- public void run(SourceContext<Tuple3<Long, String, Integer>> ctx) throws Exception {
- while (isRunning && counter < 100) {
- outTuple.f0 = System.currentTimeMillis();
- outTuple.f1 = names[rand.nextInt(names.length)];
- outTuple.f2 = rand.nextInt(GRADE_COUNT) + 1;
- Thread.sleep(rand.nextInt(SLEEP_TIME) + 1);
- counter++;
- ctx.collect(outTuple);
- }
- }
-
- @Override
- public void cancel() {
- isRunning = false;
- }
- }
-
- /**
- * Continuously emit tuples with random names and integers (salaries).
- */
- public static class SalarySource extends RichSourceFunction<Tuple3<Long, String, Integer>> {
- private static final long serialVersionUID = 1L;
-
- private transient Random rand;
- private transient Tuple3<Long, String, Integer> outTuple;
- private volatile boolean isRunning;
- private int counter;
-
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- rand = new Random();
- outTuple = new Tuple3<Long, String, Integer>();
- isRunning = true;
- }
-
-
- @Override
- public void run(SourceContext<Tuple3<Long, String, Integer>> ctx) throws Exception {
- while (isRunning && counter < 100) {
- outTuple.f0 = System.currentTimeMillis();
- outTuple.f1 = names[rand.nextInt(names.length)];
- outTuple.f2 = rand.nextInt(SALARY_MAX) + 1;
- Thread.sleep(rand.nextInt(SLEEP_TIME) + 1);
- counter++;
- ctx.collect(outTuple);
- }
- }
-
- @Override
- public void cancel() {
- isRunning = false;
- }
- }
-
- public static class MySourceMap extends RichMapFunction<String, Tuple3<Long, String, Integer>> {
-
- private static final long serialVersionUID = 1L;
-
- private String[] record;
-
- public MySourceMap() {
- record = new String[2];
- }
-
- @Override
- public Tuple3<Long, String, Integer> map(String line) throws Exception {
- record = line.substring(1, line.length() - 1).split(",");
- return new Tuple3<>(Long.parseLong(record[0]), record[1], Integer.parseInt(record[2]));
- }
- }
-
- public static class MyJoinFunction
- implements
- JoinFunction<Tuple3<Long, String, Integer>, Tuple3<Long, String, Integer>, Tuple3<String, Integer, Integer>> {
-
- private static final long serialVersionUID = 1L;
-
- private Tuple3<String, Integer, Integer> joined = new Tuple3<>();
-
- @Override
- public Tuple3<String, Integer, Integer> join(Tuple3<Long, String, Integer> first,
- Tuple3<Long, String, Integer> second) throws Exception {
- joined.f0 = first.f1;
- joined.f1 = first.f2;
- joined.f2 = second.f2;
- return joined;
- }
- }
-
- private static class MyTimestampExtractor implements TimestampExtractor<Tuple3<Long, String, Integer>> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public long extractTimestamp(Tuple3<Long, String, Integer> element, long currentTimestamp) {
- return element.f0;
- }
-
- @Override
- public long extractWatermark(Tuple3<Long, String, Integer> element, long currentTimestamp) {
- return element.f0 - 1;
- }
-
- @Override
- public long getCurrentWatermark() {
- return Long.MIN_VALUE;
- }
- }
-
- private static class NameKeySelector implements KeySelector<Tuple3<Long, String, Integer>, String> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public String getKey(Tuple3<Long, String, Integer> value) throws Exception {
- return value.f1;
- }
- }
-
- // *************************************************************************
- // UTIL METHODS
- // *************************************************************************
-
- private static boolean fileInput = false;
- private static boolean fileOutput = false;
-
- private static String gradesPath;
- private static String salariesPath;
- private static String outputPath;
-
- private static boolean parseParameters(String[] args) {
-
- if (args.length > 0) {
- // parse input arguments
- if (args.length == 1) {
- fileOutput = true;
- outputPath = args[0];
- } else if (args.length == 3) {
- fileInput = true;
- fileOutput = true;
- gradesPath = args[0];
- salariesPath = args[1];
- outputPath = args[2];
- } else {
- System.err.println("Usage: WindowJoin <result path> or WindowJoin <input path 1> <input path 2> " +
- "<result path>");
- return false;
- }
- } else {
- System.out.println("Executing WindowJoin with generated data.");
- System.out.println(" Provide parameter to write to file.");
- System.out.println(" Usage: WindowJoin <result path>");
- }
- return true;
- }
-
- private static Tuple2<DataStream<Tuple3<Long, String, Integer>>, DataStream<Tuple3<Long, String, Integer>>> getInputStreams(
- StreamExecutionEnvironment env) {
-
- DataStream<Tuple3<Long, String, Integer>> grades;
- DataStream<Tuple3<Long, String, Integer>> salaries;
-
- if (fileInput) {
- grades = env.readTextFile(gradesPath).map(new MySourceMap());
- salaries = env.readTextFile(salariesPath).map(new MySourceMap());
- } else {
- grades = env.addSource(new GradeSource());
- salaries = env.addSource(new SalarySource());
- }
-
- return Tuple2.of(grades, salaries);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/util/WindowJoinData.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/util/WindowJoinData.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/util/WindowJoinData.java
deleted file mode 100644
index 15c1280..0000000
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/util/WindowJoinData.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.join.util;
-
-public class WindowJoinData {
-
- public static final String GRADES_INPUT = "(0,john,5)\n" + "(0,tom,3)\n" + "(0,alice,1)\n" + "(0,grace,5)\n" +
- "(1,john,4)\n" + "(1,bob,1)\n" + "(1,alice,2)\n" + "(1,alice,3)\n" + "(1,bob,5)\n" + "(1,alice,3)\n" + "(1,tom,5)\n" +
- "(2,john,2)\n" + "(2,john,1)\n" + "(2,grace,2)\n" + "(2,jerry,2)\n" + "(2,tom,4)\n" + "(2,bob,4)\n" + "(2,bob,2)\n" +
- "(3, tom,2)\n" + "(3,alice,5)\n" + "(3,grace,5)\n" + "(3,grace,1)\n" + "(3,alice,1)\n" + "(3,grace,3)\n" + "(3,tom,1)\n" +
- "(4,jerry,5)\n" + "(4,john,3)\n" + "(4,john,4)\n" + "(4,john,1)\n" + "(4,jerry,3)\n" + "(4,grace,3)\n" + "(4,bob,3)\n" +
- "(5,john,3)\n" + "(5,jerry,4)\n" + "(5,tom,5)\n" + "(5,tom,4)\n" + "(5,john,2)\n" + "(5,jerry,1)\n" + "(5,bob,1)\n" +
- "(6,john,5)\n" + "(6,grace,4)\n" + "(6,tom,5)\n" + "(6,john,4)\n" + "(6,tom,1)\n" + "(6,grace,1)\n" + "(6,john,2)\n" +
- "(7,jerry,3)\n" + "(7,jerry,5)\n" + "(7,tom,2)\n" + "(7,tom,2)\n" + "(7,alice,4)\n" + "(7,tom,4)\n" + "(7,jerry,4)\n" +
- "(8,john,3)\n" + "(8,grace,4)\n" + "(8,tom,3)\n" + "(8,jerry,4)\n" + "(8,john,5)\n" + "(8,john,4)\n" + "(8,jerry,1)\n" +
- "(9,john,5)\n" + "(9,alice,2)\n" + "(9,tom,1)\n" + "(9,alice,5)\n" + "(9,grace,4)\n" + "(9,bob,4)\n" + "(9,jerry,1)\n" +
- "(10,john,5)\n" + "(10,tom,4)\n" + "(10,tom,5)\n" + "(10,jerry,5)\n" + "(10,tom,1)\n" + "(10,grace,3)\n" + "(10,bob,5)\n" +
- "(11,john,1)\n" + "(11,alice,1)\n" + "(11,grace,3)\n" + "(11,grace,1)\n" + "(11,jerry,1)\n" + "(11,jerry,4)\n" +
- "(12,bob,4)\n" + "(12,alice,3)\n" + "(12,tom,5)\n" + "(12,alice,4)\n" + "(12,alice,4)\n" + "(12,grace,4)\n" + "(12,john,5)\n" +
- "(13,john,5)\n" + "(13,grace,4)\n" + "(13,tom,4)\n" + "(13,john,4)\n" + "(13,john,5)\n" + "(13,alice,5)\n" + "(13,jerry,5)\n" +
- "(14,john,3)\n" + "(14,tom,5)\n" + "(14,jerry,4)\n" + "(14,grace,4)\n" + "(14,john,3)\n" + "(14,bob,2)";
-
- public static final String SALARIES_INPUT = "(0,john,6469)\n" + "(0,jerry,6760)\n" + "(0,jerry,8069)\n" +
- "(1,tom,3662)\n" + "(1,grace,8427)\n" + "(1,john,9425)\n" + "(1,bob,9018)\n" + "(1,john,352)\n" + "(1,tom,3770)\n" +
- "(2,grace,7622)\n" + "(2,jerry,7441)\n" + "(2,alice,1468)\n" + "(2,bob,5472)\n" + "(2,grace,898)\n" +
- "(3,tom,3849)\n" + "(3,grace,1865)\n" + "(3,alice,5582)\n" + "(3,john,9511)\n" + "(3,alice,1541)\n" +
- "(4,john,2477)\n" + "(4,grace,3561)\n" + "(4,john,1670)\n" + "(4,grace,7290)\n" + "(4,grace,6565)\n" +
- "(5,tom,6179)\n" + "(5,tom,1601)\n" + "(5,john,2940)\n" + "(5,bob,4685)\n" + "(5,bob,710)\n" + "(5,bob,5936)\n" +
- "(6,jerry,1412)\n" + "(6,grace,6515)\n" + "(6,grace,3321)\n" + "(6,tom,8088)\n" + "(6,john,2876)\n" +
- "(7,bob,9896)\n" + "(7,grace,7368)\n" + "(7,grace,9749)\n" + "(7,bob,2048)\n" + "(7,alice,4782)\n" +
- "(8,alice,3375)\n" + "(8,tom,5841)\n" + "(8,bob,958)\n" + "(8,bob,5258)\n" + "(8,tom,3935)\n" + "(8,jerry,4394)\n" +
- "(9,alice,102)\n" + "(9,alice,4931)\n" + "(9,alice,5240)\n" + "(9,jerry,7951)\n" + "(9,john,5675)\n" +
- "(10,bob,609)\n" + "(10,alice,5997)\n" + "(10,jerry,9651)\n" + "(10,alice,1328)\n" + "(10,bob,1022)\n" +
- "(11,grace,2578)\n" + "(11,jerry,9704)\n" + "(11,tom,4476)\n" + "(11,grace,3784)\n" + "(11,alice,6144)\n" +
- "(12,bob,6213)\n" + "(12,alice,7525)\n" + "(12,jerry,2908)\n" + "(12,grace,8464)\n" + "(12,jerry,9920)\n" +
- "(13,bob,3720)\n" + "(13,bob,7612)\n" + "(13,alice,7211)\n" + "(13,jerry,6484)\n" + "(13,alice,1711)\n" +
- "(14,jerry,5994)\n" + "(14,grace,928)\n" + "(14,jerry,2492)\n" + "(14,grace,9080)\n" + "(14,tom,4330)\n" +
- "(15,bob,8302)\n" + "(15,john,4981)\n" + "(15,tom,1781)\n" + "(15,grace,1379)\n" + "(15,jerry,3700)\n" +
- "(16,jerry,3584)\n" + "(16,jerry,2038)\n" + "(16,jerry,3902)\n" + "(16,tom,1336)\n" + "(16,jerry,7500)\n" +
- "(17,tom,3648)\n" + "(17,alice,2533)\n" + "(17,tom,8685)\n" + "(17,bob,3968)\n" + "(17,tom,3241)\n" + "(17,bob,7461)\n" +
- "(18,jerry,2138)\n" + "(18,alice,7503)\n" + "(18,alice,6424)\n" + "(18,tom,140)\n" + "(18,john,9802)\n" +
- "(19,grace,2977)\n" + "(19,grace,889)\n" + "(19,john,1338)";
-
- private WindowJoinData() {
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
deleted file mode 100644
index ce227e4..0000000
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
+++ /dev/null
@@ -1,255 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.ml;
-
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.TimestampExtractor;
-import org.apache.flink.streaming.api.functions.co.CoMapFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
-import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.util.Collector;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * Skeleton for incremental machine learning algorithm consisting of a
- * pre-computed model, which gets updated for the new inputs and new input data
- * for which the job provides predictions.
- *
- * <p>
- * This may serve as a base of a number of algorithms, e.g. updating an
- * incremental Alternating Least Squares model while also providing the
- * predictions.
- *
- * <p/>
- * This example shows how to use:
- * <ul>
- * <li>Connected streams
- * <li>CoFunctions
- * <li>Tuple data types
- * </ul>
- */
-public class IncrementalLearningSkeleton {
-
- private static DataStream<Integer> trainingData = null;
- private static DataStream<Integer> newData = null;
-
- // *************************************************************************
- // PROGRAM
- // *************************************************************************
-
- public static void main(String[] args) throws Exception {
-
- if (!parseParameters(args)) {
- return;
- }
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-
- trainingData = env.addSource(new FiniteTrainingDataSource());
- newData = env.addSource(new FiniteNewDataSource());
-
- // build new model on every second of new data
- DataStream<Double[]> model = trainingData
- .assignTimestamps(new LinearTimestamp())
- .timeWindowAll(Time.of(5000, TimeUnit.MILLISECONDS))
- .apply(new PartialModelBuilder());
-
- // use partial model for newData
- DataStream<Integer> prediction = newData.connect(model).map(new Predictor());
-
- // emit result
- if (fileOutput) {
- prediction.writeAsText(outputPath, 1);
- } else {
- prediction.print();
- }
-
- // execute program
- env.execute("Streaming Incremental Learning");
- }
-
- // *************************************************************************
- // USER FUNCTIONS
- // *************************************************************************
-
- /**
- * Feeds new data for newData. By default it is implemented as constantly
- * emitting the Integer 1 in a loop.
- */
- public static class FiniteNewDataSource implements SourceFunction<Integer> {
- private static final long serialVersionUID = 1L;
- private int counter;
-
- @Override
- public void run(SourceContext<Integer> ctx) throws Exception {
- Thread.sleep(15);
- while (counter < 50) {
- ctx.collect(getNewData());
- }
- }
-
- @Override
- public void cancel() {
- // No cleanup needed
- }
-
- private Integer getNewData() throws InterruptedException {
- Thread.sleep(5);
- counter++;
- return 1;
- }
- }
-
- /**
- * Feeds new training data for the partial model builder. By default it is
- * implemented as constantly emitting the Integer 1 in a loop.
- */
- public static class FiniteTrainingDataSource implements SourceFunction<Integer> {
- private static final long serialVersionUID = 1L;
- private int counter = 0;
-
- @Override
- public void run(SourceContext<Integer> collector) throws Exception {
- while (counter < 8200) {
- collector.collect(getTrainingData());
- }
- }
-
- @Override
- public void cancel() {
- // No cleanup needed
- }
-
- private Integer getTrainingData() throws InterruptedException {
- counter++;
- return 1;
- }
- }
-
- public static class LinearTimestamp implements TimestampExtractor<Integer> {
- private static final long serialVersionUID = 1L;
-
- private long counter = 0L;
-
- @Override
- public long extractTimestamp(Integer element, long currentTimestamp) {
- return counter += 10L;
- }
-
- @Override
- public long extractWatermark(Integer element, long currentTimestamp) {
- return counter - 1;
- }
-
- @Override
- public long getCurrentWatermark() {
- return Long.MIN_VALUE;
- }
-
- }
-
- /**
- * Builds up-to-date partial models on new training data.
- */
- public static class PartialModelBuilder implements AllWindowFunction<Integer, Double[], TimeWindow> {
- private static final long serialVersionUID = 1L;
-
- protected Double[] buildPartialModel(Iterable<Integer> values) {
- return new Double[]{1.};
- }
-
- @Override
- public void apply(TimeWindow window, Iterable<Integer> values, Collector<Double[]> out) throws Exception {
- out.collect(buildPartialModel(values));
- }
- }
-
- /**
- * Creates newData using the model produced in batch-processing and the
- * up-to-date partial model.
- * <p/>
- * <p>
- * By defaults emits the Integer 0 for every newData and the Integer 1
- * for every model update.
- * </p>
- */
- public static class Predictor implements CoMapFunction<Integer, Double[], Integer> {
- private static final long serialVersionUID = 1L;
-
- Double[] batchModel = null;
- Double[] partialModel = null;
-
- @Override
- public Integer map1(Integer value) {
- // Return newData
- return predict(value);
- }
-
- @Override
- public Integer map2(Double[] value) {
- // Update model
- partialModel = value;
- batchModel = getBatchModel();
- return 1;
- }
-
- // pulls model built with batch-job on the old training data
- protected Double[] getBatchModel() {
- return new Double[]{0.};
- }
-
- // performs newData using the two models
- protected Integer predict(Integer inTuple) {
- return 0;
- }
-
- }
-
- // *************************************************************************
- // UTIL METHODS
- // *************************************************************************
-
- private static boolean fileOutput = false;
- private static String outputPath;
-
- private static boolean parseParameters(String[] args) {
-
- if (args.length > 0) {
- // parse input arguments
- fileOutput = true;
- if (args.length == 1) {
- outputPath = args[0];
- } else {
- System.err.println("Usage: IncrementalLearningSkeleton <result path>");
- return false;
- }
- } else {
- System.out.println("Executing IncrementalLearningSkeleton with generated data.");
- System.out.println(" Provide parameter to write to file.");
- System.out.println(" Usage: IncrementalLearningSkeleton <result path>");
- }
- return true;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/util/IncrementalLearningSkeletonData.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/util/IncrementalLearningSkeletonData.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/util/IncrementalLearningSkeletonData.java
deleted file mode 100644
index 8a6cd88..0000000
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/util/IncrementalLearningSkeletonData.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.ml.util;
-
-public class IncrementalLearningSkeletonData {
-
- public static final String RESULTS = "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" +
- "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "0\n" + "0\n" +
- "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" +
- "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" +
- "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" +
- "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" +
- "0\n" + "0\n" + "0\n" + "0\n";
-
- private IncrementalLearningSkeletonData() {
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java
deleted file mode 100644
index 17add2c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.socket;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.examples.wordcount.WordCount.Tokenizer;
-
-/**
- * This example shows an implementation of WordCount with data from a text
- * socket. To run the example make sure that the service providing the text data
- * is already up and running.
- * <p/>
- * <p/>
- * To start an example socket text stream on your local machine run netcat from
- * a command line: <code>nc -lk 9999</code>, where the parameter specifies the
- * port number.
- * <p/>
- * <p/>
- * <p/>
- * Usage:
- * <code>SocketTextStreamWordCount <hostname> <port> <result path></code>
- * <br>
- * <p/>
- * <p/>
- * This example shows how to:
- * <ul>
- * <li>use StreamExecutionEnvironment.socketTextStream
- * <li>write a simple Flink program,
- * <li>write and use user-defined functions.
- * </ul>
- *
- * @see <a href="www.openbsd.org/cgi-bin/man.cgi?query=nc">netcat</a>
- */
-public class SocketTextStreamWordCount {
- public static void main(String[] args) throws Exception {
-
- if (!parseParameters(args)) {
- return;
- }
-
- // set up the execution environment
- final StreamExecutionEnvironment env = StreamExecutionEnvironment
- .getExecutionEnvironment();
-
- // get input data
- DataStream<String> text = env.socketTextStream(hostName, port, '\n', 0);
-
- DataStream<Tuple2<String, Integer>> counts =
- // split up the lines in pairs (2-tuples) containing: (word,1)
- text.flatMap(new Tokenizer())
- // group by the tuple field "0" and sum up tuple field "1"
- .keyBy(0)
- .sum(1);
-
- if (fileOutput) {
- counts.writeAsText(outputPath, 1);
- } else {
- counts.print();
- }
-
- // execute program
- env.execute("WordCount from SocketTextStream Example");
- }
-
- // *************************************************************************
- // UTIL METHODS
- // *************************************************************************
-
- private static boolean fileOutput = false;
- private static String hostName;
- private static int port;
- private static String outputPath;
-
- private static boolean parseParameters(String[] args) {
-
- // parse input arguments
- if (args.length == 3) {
- fileOutput = true;
- hostName = args[0];
- port = Integer.valueOf(args[1]);
- outputPath = args[2];
- } else if (args.length == 2) {
- hostName = args[0];
- port = Integer.valueOf(args[1]);
- } else {
- System.err.println("Usage: SocketTextStreamWordCount <hostname> <port> [<output path>]");
- return false;
- }
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
deleted file mode 100644
index c2477b5..0000000
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.twitter;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
-import org.apache.flink.streaming.connectors.twitter.TwitterSource;
-import org.apache.flink.streaming.examples.twitter.util.TwitterStreamData;
-import org.apache.flink.util.Collector;
-import org.apache.sling.commons.json.JSONException;
-
-import java.util.StringTokenizer;
-
-/**
- * Implements the "TwitterStream" program that computes a most used word
- * occurrence over JSON files in a streaming fashion.
- * <p/>
- * <p/>
- * The input is a JSON text file with lines separated by newline characters.
- * <p/>
- * <p/>
- * Usage: <code>TwitterStream <text path></code><br>
- * If no parameters are provided, the program is run with default data from
- * {@link TwitterStreamData}.
- * <p/>
- * <p/>
- * This example shows how to:
- * <ul>
- * <li>acquire external data,
- * <li>use in-line defined functions,
- * <li>handle flattened stream inputs.
- * </ul>
- */
-public class TwitterStream {
-
- // *************************************************************************
- // PROGRAM
- // *************************************************************************
-
- public static void main(String[] args) throws Exception {
- if (!parseParameters(args)) {
- return;
- }
-
- // set up the execution environment
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- // get input data
- DataStream<String> streamSource = getTextDataStream(env);
-
- DataStream<Tuple2<String, Integer>> tweets = streamSource
- // selecting English tweets and splitting to (word, 1)
- .flatMap(new SelectEnglishAndTokenizeFlatMap())
- // group by words and sum their occurrences
- .keyBy(0).sum(1);
-
- // emit result
- if (fileOutput) {
- tweets.writeAsText(outputPath);
- } else {
- tweets.print();
- }
-
- // execute program
- env.execute("Twitter Streaming Example");
- }
-
- // *************************************************************************
- // USER FUNCTIONS
- // *************************************************************************
-
- /**
- * Makes sentences from English tweets.
- * <p/>
- * <p>
- * Implements a string tokenizer that splits sentences into words as a
- * user-defined FlatMapFunction. The function takes a line (String) and
- * splits it into multiple pairs in the form of "(word,1)" (Tuple2<String,
- * Integer>).
- * </p>
- */
- public static class SelectEnglishAndTokenizeFlatMap extends JSONParseFlatMap<String, Tuple2<String, Integer>> {
- private static final long serialVersionUID = 1L;
-
- /**
- * Select the language from the incoming JSON text
- */
- @Override
- public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
- try {
- if (getString(value, "user.lang").equals("en")) {
- // message of tweet
- StringTokenizer tokenizer = new StringTokenizer(getString(value, "text"));
-
- // split the message
- while (tokenizer.hasMoreTokens()) {
- String result = tokenizer.nextToken().replaceAll("\\s*", "").toLowerCase();
-
- if (result != null && !result.equals("")) {
- out.collect(new Tuple2<String, Integer>(result, 1));
- }
- }
- }
- } catch (JSONException e) {
- // the JSON was not parsed correctly
- }
- }
- }
-
- // *************************************************************************
- // UTIL METHODS
- // *************************************************************************
-
- private static boolean fileInput = false;
- private static boolean fileOutput = false;
- private static String propertiesPath;
- private static String outputPath;
-
- private static boolean parseParameters(String[] args) {
- if (args.length > 0) {
- // parse input arguments
- fileOutput = true;
- if (args.length == 2) {
- fileInput = true;
- propertiesPath = args[0];
- outputPath = args[1];
- } else if (args.length == 1) {
- outputPath = args[0];
- } else {
- System.err.println("USAGE:\nTwitterStream [<pathToPropertiesFile>] <result path>");
- return false;
- }
- } else {
- System.out.println("Executing TwitterStream example with built-in default data.");
- System.out.println(" Provide parameters to read input data from a file.");
- System.out.println(" USAGE: TwitterStream [<pathToPropertiesFile>] <result path>");
- }
- return true;
- }
-
- private static DataStream<String> getTextDataStream(StreamExecutionEnvironment env) {
- if (fileInput) {
- // read the text file from given input path
- return env.addSource(new TwitterSource(propertiesPath));
- } else {
- // get default test text data
- return env.fromElements(TwitterStreamData.TEXTS);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/util/TwitterStreamData.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/util/TwitterStreamData.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/util/TwitterStreamData.java
deleted file mode 100644
index b06d193..0000000
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/util/TwitterStreamData.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.twitter.util;
-
-//example data looking like tweets, but not acquired from Twitter
-public class TwitterStreamData {
- public static final String[] TEXTS = new String[] {
- "{\"created_at\":\"Mon Jan 1 00:00:00 +0000 1901\",\"id\":000000000000000000,\"id_str\":\"000000000000000000\",\"text\":\"Apache Flink\",\"source\":null,\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":0000000000,\"id_str\":\"0000000000\",\"name\":\"Apache Flink\",\"screen_name\":\"Apache Flink\",\"location\":\"Berlin\",\"protected\":false,\"verified\":false,\"followers_count\":999999,\"friends_count\":99999,\"listed_count\":999,\"favourites_count\":9999,\"statuses_count\":999,\"created_at\":\"Mon Jan 1 00:00:00 +0000 1901\",\"utc_offset\":7200,\"time_zone\":\"Amsterdam\",\"geo_enabled\":false,\"lang\":\"en\",\"entities\":{\"hashtags\":[{\"text\":\"example1\",\"indices\":[0,0]},{\"text\":\"tweet1\",\"indices\":[0,0]}]},\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"C6E2EE\",\"profile_background_
tile\":false,\"profile_link_color\":\"1F98C7\",\"profile_sidebar_border_color\":\"FFFFFF\",\"profile_sidebar_fill_color\":\"252429\",\"profile_text_color\":\"666666\",\"profile_use_background_image\":true,\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null}",
- "{\"created_at\":\"Mon Jan 1 00:00:00 +0000 1901\",\"id\":000000000000000001,\"id_str\":\"000000000000000000\",\"text\":\"Apache Flink\",\"source\":null,\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":0000000000,\"id_str\":\"0000000000\",\"name\":\"Apache Flink\",\"screen_name\":\"Apache Flink\",\"location\":\"Berlin\",\"protected\":false,\"verified\":false,\"followers_count\":999999,\"friends_count\":99999,\"listed_count\":999,\"favourites_count\":9999,\"statuses_count\":999,\"created_at\":\"Mon Jan 1 00:00:00 +0000 1901\",\"utc_offset\":7200,\"time_zone\":\"Amsterdam\",\"geo_enabled\":false,\"lang\":\"en\",\"entities\":{\"hashtags\":[{\"text\":\"example2\",\"indices\":[0,0]},{\"text\":\"tweet2\",\"indices\":[0,0]}]},\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"C6E2EE\",\"profile_background_
tile\":false,\"profile_link_color\":\"1F98C7\",\"profile_sidebar_border_color\":\"FFFFFF\",\"profile_sidebar_fill_color\":\"252429\",\"profile_text_color\":\"666666\",\"profile_use_background_image\":true,\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null}",
- "{\"created_at\":\"Mon Jan 1 00:00:00 +0000 1901\",\"id\":000000000000000002,\"id_str\":\"000000000000000000\",\"text\":\"Apache Flink\",\"source\":null,\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":0000000000,\"id_str\":\"0000000000\",\"name\":\"Apache Flink\",\"screen_name\":\"Apache Flink\",\"location\":\"Berlin\",\"protected\":false,\"verified\":false,\"followers_count\":999999,\"friends_count\":99999,\"listed_count\":999,\"favourites_count\":9999,\"statuses_count\":999,\"created_at\":\"Mon Jan 1 00:00:00 +0000 1901\",\"utc_offset\":7200,\"time_zone\":\"Amsterdam\",\"geo_enabled\":false,\"lang\":\"en\",\"entities\":{\"hashtags\":[{\"text\":\"example3\",\"indices\":[0,0]},{\"text\":\"tweet3\",\"indices\":[0,0]}]},\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"C6E2EE\",\"profile_background_
tile\":false,\"profile_link_color\":\"1F98C7\",\"profile_sidebar_border_color\":\"FFFFFF\",\"profile_sidebar_fill_color\":\"252429\",\"profile_text_color\":\"666666\",\"profile_use_background_image\":true,\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null}",
- };
-
- public static final String STREAMING_COUNTS_AS_TUPLES = "(apache,1)\n" + "(apache,2)\n" + "(apache,3)\n" + "(flink,1)\n" + "(flink,2)\n" + "(flink,3)\n";
-
- private TwitterStreamData() {
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
deleted file mode 100644
index 982b73d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.windowing;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
-import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.util.Collector;
-
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-
-@SuppressWarnings("serial")
-public class GroupedProcessingTimeWindowExample {
-
- public static void main(String[] args) throws Exception {
-
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(4);
-
- DataStream<Tuple2<Long, Long>> stream = env
- .addSource(new RichParallelSourceFunction<Tuple2<Long, Long>>() {
-
- private volatile boolean running = true;
-
- @Override
- public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception {
-
- final long startTime = System.currentTimeMillis();
-
- final long numElements = 20000000;
- final long numKeys = 10000;
- long val = 1L;
- long count = 0L;
-
-
- while (running && count < numElements) {
- count++;
- ctx.collect(new Tuple2<Long, Long>(val++, 1L));
-
- if (val > numKeys) {
- val = 1L;
- }
- }
-
- final long endTime = System.currentTimeMillis();
- System.out.println("Took " + (endTime-startTime) + " msecs for " + numElements + " values");
- }
-
- @Override
- public void cancel() {
- running = false;
- }
- });
-
- stream
- .keyBy(0)
- .timeWindow(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS))
- .reduce(new SummingReducer())
-
- // alternative: use a apply function which does not pre-aggregate
-// .keyBy(new FirstFieldKeyExtractor<Tuple2<Long, Long>, Long>())
-// .window(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS))
-// .apply(new SummingWindowFunction())
-
- .addSink(new SinkFunction<Tuple2<Long, Long>>() {
- @Override
- public void invoke(Tuple2<Long, Long> value) {
- }
- });
-
- env.execute();
- }
-
- public static class FirstFieldKeyExtractor<Type extends Tuple, Key> implements KeySelector<Type, Key> {
-
- @Override
- @SuppressWarnings("unchecked")
- public Key getKey(Type value) {
- return (Key) value.getField(0);
- }
- }
-
- public static class SummingWindowFunction implements WindowFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Long, Window> {
-
- @Override
- public void apply(Long key, Window window, Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) {
- long sum = 0L;
- for (Tuple2<Long, Long> value : values) {
- sum += value.f1;
- }
-
- out.collect(new Tuple2<>(key, sum));
- }
- }
-
- public static class SummingReducer implements ReduceFunction<Tuple2<Long, Long>> {
-
- @Override
- public Tuple2<Long, Long> reduce(Tuple2<Long, Long> value1, Tuple2<Long, Long> value2) {
- return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
deleted file mode 100644
index 3c63156..0000000
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.windowing;
-
-import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.EventTimeSourceFunction;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
-import org.apache.flink.streaming.api.windowing.triggers.Trigger;
-import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class SessionWindowing {
-
- @SuppressWarnings("serial")
- public static void main(String[] args) throws Exception {
-
- if (!parseParameters(args)) {
- return;
- }
-
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
- env.setParallelism(2);
-
- final List<Tuple3<String, Long, Integer>> input = new ArrayList<>();
-
- input.add(new Tuple3<>("a", 1L, 1));
- input.add(new Tuple3<>("b", 1L, 1));
- input.add(new Tuple3<>("b", 3L, 1));
- input.add(new Tuple3<>("b", 5L, 1));
- input.add(new Tuple3<>("c", 6L, 1));
- // We expect to detect the session "a" earlier than this point (the old
- // functionality can only detect here when the next starts)
- input.add(new Tuple3<>("a", 10L, 1));
- // We expect to detect session "b" and "c" at this point as well
- input.add(new Tuple3<>("c", 11L, 1));
-
- DataStream<Tuple3<String, Long, Integer>> source = env
- .addSource(new EventTimeSourceFunction<Tuple3<String,Long,Integer>>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void run(SourceContext<Tuple3<String, Long, Integer>> ctx) throws Exception {
- for (Tuple3<String, Long, Integer> value : input) {
- ctx.collectWithTimestamp(value, value.f1);
- ctx.emitWatermark(new Watermark(value.f1 - 1));
- if (!fileOutput) {
- System.out.println("Collected: " + value);
- }
- }
- ctx.emitWatermark(new Watermark(Long.MAX_VALUE));
- }
-
- @Override
- public void cancel() {
- }
- });
-
- // We create sessions for each id with max timeout of 3 time units
- DataStream<Tuple3<String, Long, Integer>> aggregated = source
- .keyBy(0)
- .window(GlobalWindows.create())
- .trigger(new SessionTrigger(3L))
- .sum(2);
-
- if (fileOutput) {
- aggregated.writeAsText(outputPath);
- } else {
- aggregated.print();
- }
-
- env.execute();
- }
-
- private static class SessionTrigger implements Trigger<Tuple3<String, Long, Integer>, GlobalWindow> {
-
- private static final long serialVersionUID = 1L;
-
- private final Long sessionTimeout;
-
- public SessionTrigger(Long sessionTimeout) {
- this.sessionTimeout = sessionTimeout;
-
- }
-
- @Override
- public TriggerResult onElement(Tuple3<String, Long, Integer> element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {
-
- OperatorState<Long> lastSeenState = ctx.getKeyValueState("last-seen", 1L);
- Long lastSeen = lastSeenState.value();
-
- Long timeSinceLastEvent = timestamp - lastSeen;
-
- // Update the last seen event time
- lastSeenState.update(timestamp);
-
- ctx.registerEventTimeTimer(lastSeen + sessionTimeout);
-
- if (timeSinceLastEvent > sessionTimeout) {
- return TriggerResult.FIRE_AND_PURGE;
- } else {
- return TriggerResult.CONTINUE;
- }
- }
-
- @Override
- public TriggerResult onEventTime(long time, TriggerContext ctx) throws Exception {
- OperatorState<Long> lastSeenState = ctx.getKeyValueState("last-seen", 1L);
- Long lastSeen = lastSeenState.value();
-
- if (time - lastSeen >= sessionTimeout) {
- return TriggerResult.FIRE_AND_PURGE;
- }
- return TriggerResult.CONTINUE;
- }
-
- @Override
- public TriggerResult onProcessingTime(long time,
- TriggerContext ctx) throws Exception {
- return TriggerResult.CONTINUE;
- }
- }
-
- // *************************************************************************
- // UTIL METHODS
- // *************************************************************************
-
- private static boolean fileOutput = false;
- private static String outputPath;
-
- private static boolean parseParameters(String[] args) {
-
- if (args.length > 0) {
- // parse input arguments
- if (args.length == 1) {
- fileOutput = true;
- outputPath = args[0];
- } else {
- System.err.println("Usage: SessionWindowing <result path>");
- return false;
- }
- }
- return true;
- }
-
-}