You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2016/03/09 19:27:51 UTC
[4/8] spark git commit: [SPARK-13595][BUILD] Move docker,
extras modules into external
http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/extras/README.md
----------------------------------------------------------------------
diff --git a/extras/README.md b/extras/README.md
deleted file mode 100644
index 1b4174b..0000000
--- a/extras/README.md
+++ /dev/null
@@ -1 +0,0 @@
-This directory contains build components not included by default in Spark's build.
http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/extras/java8-tests/README.md
----------------------------------------------------------------------
diff --git a/extras/java8-tests/README.md b/extras/java8-tests/README.md
deleted file mode 100644
index dc9e87f..0000000
--- a/extras/java8-tests/README.md
+++ /dev/null
@@ -1,24 +0,0 @@
-# Java 8 Test Suites
-
-These tests require having Java 8 installed and are isolated from the main Spark build.
-If Java 8 is not your system's default Java version, you will need to point Spark's build
-to your Java location. The set-up depends a bit on the build system:
-
-* Sbt users can either set JAVA_HOME to the location of a Java 8 JDK or explicitly pass
- `-java-home` to the sbt launch script. If a Java 8 JDK is detected sbt will automatically
- include the Java 8 test project.
-
- `$ JAVA_HOME=/opt/jdk1.8.0/ build/sbt clean "test-only org.apache.spark.Java8APISuite"`
-
-* For Maven users,
-
- Maven users can also refer to their Java 8 directory using JAVA_HOME. However, Maven will not
- automatically detect the presence of a Java 8 JDK, so a special build profile `-Pjava8-tests`
- must be used.
-
- `$ JAVA_HOME=/opt/jdk1.8.0/ mvn clean install -DskipTests`
- `$ JAVA_HOME=/opt/jdk1.8.0/ mvn test -Pjava8-tests -DwildcardSuites=org.apache.spark.Java8APISuite`
-
- Note that the above command can only be run from project root directory since this module
- depends on core and the test-jars of core and streaming. This means an install step is
- required to make the test dependencies visible to the Java 8 sub-project.
http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/extras/java8-tests/pom.xml
----------------------------------------------------------------------
diff --git a/extras/java8-tests/pom.xml b/extras/java8-tests/pom.xml
deleted file mode 100644
index 0ad9c53..0000000
--- a/extras/java8-tests/pom.xml
+++ /dev/null
@@ -1,161 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-~ Licensed to the Apache Software Foundation (ASF) under one or more
-~ contributor license agreements. See the NOTICE file distributed with
-~ this work for additional information regarding copyright ownership.
-~ The ASF licenses this file to You under the Apache License, Version 2.0
-~ (the "License"); you may not use this file except in compliance with
-~ the License. You may obtain a copy of the License at
-~
-~ http://www.apache.org/licenses/LICENSE-2.0
-~
-~ Unless required by applicable law or agreed to in writing, software
-~ distributed under the License is distributed on an "AS IS" BASIS,
-~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-~ See the License for the specific language governing permissions and
-~ limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-parent_2.11</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- <relativePath>../../pom.xml</relativePath>
- </parent>
-
- <groupId>org.apache.spark</groupId>
- <artifactId>java8-tests_2.11</artifactId>
- <packaging>pom</packaging>
- <name>Spark Project Java8 Tests POM</name>
-
- <properties>
- <sbt.project.name>java8-tests</sbt.project.name>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-test-tags_${scala.binary.version}</artifactId>
- </dependency>
- </dependencies>
-
- <profiles>
- <profile>
- <id>java8-tests</id>
- </profile>
- </profiles>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-deploy-plugin</artifactId>
- <configuration>
- <skip>true</skip>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-install-plugin</artifactId>
- <configuration>
- <skip>true</skip>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <executions>
- <execution>
- <id>test</id>
- <goals>
- <goal>test</goal>
- </goals>
- </execution>
- </executions>
- <configuration>
- <systemPropertyVariables>
- <!-- For some reason surefire isn't setting this log4j file on the
- test classpath automatically. So we add it manually. -->
- <log4j.configuration>
- file:src/test/resources/log4j.properties
- </log4j.configuration>
- </systemPropertyVariables>
- <skipTests>false</skipTests>
- <includes>
- <include>**/Suite*.java</include>
- <include>**/*Suite.java</include>
- </includes>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <executions>
- <execution>
- <id>test-compile-first</id>
- <phase>process-test-resources</phase>
- <goals>
- <goal>testCompile</goal>
- </goals>
- </execution>
- </executions>
- <configuration>
- <fork>true</fork>
- <verbose>true</verbose>
- <forceJavacCompilerUse>true</forceJavacCompilerUse>
- <source>1.8</source>
- <compilerVersion>1.8</compilerVersion>
- <target>1.8</target>
- <encoding>UTF-8</encoding>
- <maxmem>1024m</maxmem>
- </configuration>
- </plugin>
- <plugin>
- <!-- disabled -->
- <groupId>net.alchim31.maven</groupId>
- <artifactId>scala-maven-plugin</artifactId>
- <executions>
- <execution>
- <phase>none</phase>
- </execution>
- <execution>
- <id>scala-compile-first</id>
- <phase>none</phase>
- </execution>
- <execution>
- <id>scala-test-compile-first</id>
- <phase>none</phase>
- </execution>
- <execution>
- <id>attach-scaladocs</id>
- <phase>none</phase>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-</project>
http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java
----------------------------------------------------------------------
diff --git a/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java b/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java
deleted file mode 100644
index c0b58e7..0000000
--- a/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java
+++ /dev/null
@@ -1,393 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark;
-
-import java.io.File;
-import java.io.Serializable;
-import java.util.*;
-
-import scala.Tuple2;
-
-import com.google.common.collect.Iterables;
-import com.google.common.io.Files;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.SequenceFileOutputFormat;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import org.apache.spark.api.java.JavaDoubleRDD;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.Optional;
-import org.apache.spark.api.java.function.*;
-import org.apache.spark.util.Utils;
-
-/**
- * Most of these tests replicate org.apache.spark.JavaAPISuite using java 8
- * lambda syntax.
- */
-public class Java8APISuite implements Serializable {
- static int foreachCalls = 0;
- private transient JavaSparkContext sc;
-
- @Before
- public void setUp() {
- sc = new JavaSparkContext("local", "JavaAPISuite");
- }
-
- @After
- public void tearDown() {
- sc.stop();
- sc = null;
- }
-
- @Test
- public void foreachWithAnonymousClass() {
- foreachCalls = 0;
- JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
- rdd.foreach(new VoidFunction<String>() {
- @Override
- public void call(String s) {
- foreachCalls++;
- }
- });
- Assert.assertEquals(2, foreachCalls);
- }
-
- @Test
- public void foreach() {
- foreachCalls = 0;
- JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
- rdd.foreach(x -> foreachCalls++);
- Assert.assertEquals(2, foreachCalls);
- }
-
- @Test
- public void groupBy() {
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
- Function<Integer, Boolean> isOdd = x -> x % 2 == 0;
- JavaPairRDD<Boolean, Iterable<Integer>> oddsAndEvens = rdd.groupBy(isOdd);
- Assert.assertEquals(2, oddsAndEvens.count());
- Assert.assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0))); // Evens
- Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds
-
- oddsAndEvens = rdd.groupBy(isOdd, 1);
- Assert.assertEquals(2, oddsAndEvens.count());
- Assert.assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0))); // Evens
- Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds
- }
-
- @Test
- public void leftOuterJoin() {
- JavaPairRDD<Integer, Integer> rdd1 = sc.parallelizePairs(Arrays.asList(
- new Tuple2<>(1, 1),
- new Tuple2<>(1, 2),
- new Tuple2<>(2, 1),
- new Tuple2<>(3, 1)
- ));
- JavaPairRDD<Integer, Character> rdd2 = sc.parallelizePairs(Arrays.asList(
- new Tuple2<>(1, 'x'),
- new Tuple2<>(2, 'y'),
- new Tuple2<>(2, 'z'),
- new Tuple2<>(4, 'w')
- ));
- List<Tuple2<Integer, Tuple2<Integer, Optional<Character>>>> joined =
- rdd1.leftOuterJoin(rdd2).collect();
- Assert.assertEquals(5, joined.size());
- Tuple2<Integer, Tuple2<Integer, Optional<Character>>> firstUnmatched =
- rdd1.leftOuterJoin(rdd2).filter(tup -> !tup._2()._2().isPresent()).first();
- Assert.assertEquals(3, firstUnmatched._1().intValue());
- }
-
- @Test
- public void foldReduce() {
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
- Function2<Integer, Integer, Integer> add = (a, b) -> a + b;
-
- int sum = rdd.fold(0, add);
- Assert.assertEquals(33, sum);
-
- sum = rdd.reduce(add);
- Assert.assertEquals(33, sum);
- }
-
- @Test
- public void foldByKey() {
- List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
- new Tuple2<>(2, 1),
- new Tuple2<>(2, 1),
- new Tuple2<>(1, 1),
- new Tuple2<>(3, 2),
- new Tuple2<>(3, 1)
- );
- JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
- JavaPairRDD<Integer, Integer> sums = rdd.foldByKey(0, (a, b) -> a + b);
- Assert.assertEquals(1, sums.lookup(1).get(0).intValue());
- Assert.assertEquals(2, sums.lookup(2).get(0).intValue());
- Assert.assertEquals(3, sums.lookup(3).get(0).intValue());
- }
-
- @Test
- public void reduceByKey() {
- List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
- new Tuple2<>(2, 1),
- new Tuple2<>(2, 1),
- new Tuple2<>(1, 1),
- new Tuple2<>(3, 2),
- new Tuple2<>(3, 1)
- );
- JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
- JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey((a, b) -> a + b);
- Assert.assertEquals(1, counts.lookup(1).get(0).intValue());
- Assert.assertEquals(2, counts.lookup(2).get(0).intValue());
- Assert.assertEquals(3, counts.lookup(3).get(0).intValue());
-
- Map<Integer, Integer> localCounts = counts.collectAsMap();
- Assert.assertEquals(1, localCounts.get(1).intValue());
- Assert.assertEquals(2, localCounts.get(2).intValue());
- Assert.assertEquals(3, localCounts.get(3).intValue());
-
- localCounts = rdd.reduceByKeyLocally((a, b) -> a + b);
- Assert.assertEquals(1, localCounts.get(1).intValue());
- Assert.assertEquals(2, localCounts.get(2).intValue());
- Assert.assertEquals(3, localCounts.get(3).intValue());
- }
-
- @Test
- public void map() {
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
- JavaDoubleRDD doubles = rdd.mapToDouble(x -> 1.0 * x).cache();
- doubles.collect();
- JavaPairRDD<Integer, Integer> pairs = rdd.mapToPair(x -> new Tuple2<>(x, x))
- .cache();
- pairs.collect();
- JavaRDD<String> strings = rdd.map(Object::toString).cache();
- strings.collect();
- }
-
- @Test
- public void flatMap() {
- JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello World!",
- "The quick brown fox jumps over the lazy dog."));
- JavaRDD<String> words = rdd.flatMap(x -> Arrays.asList(x.split(" ")));
-
- Assert.assertEquals("Hello", words.first());
- Assert.assertEquals(11, words.count());
-
- JavaPairRDD<String, String> pairs = rdd.flatMapToPair(s -> {
- List<Tuple2<String, String>> pairs2 = new LinkedList<>();
- for (String word : s.split(" ")) {
- pairs2.add(new Tuple2<>(word, word));
- }
- return pairs2;
- });
-
- Assert.assertEquals(new Tuple2<>("Hello", "Hello"), pairs.first());
- Assert.assertEquals(11, pairs.count());
-
- JavaDoubleRDD doubles = rdd.flatMapToDouble(s -> {
- List<Double> lengths = new LinkedList<>();
- for (String word : s.split(" ")) {
- lengths.add((double) word.length());
- }
- return lengths;
- });
-
- Assert.assertEquals(5.0, doubles.first(), 0.01);
- Assert.assertEquals(11, pairs.count());
- }
-
- @Test
- public void mapsFromPairsToPairs() {
- List<Tuple2<Integer, String>> pairs = Arrays.asList(
- new Tuple2<>(1, "a"),
- new Tuple2<>(2, "aa"),
- new Tuple2<>(3, "aaa")
- );
- JavaPairRDD<Integer, String> pairRDD = sc.parallelizePairs(pairs);
-
- // Regression test for SPARK-668:
- JavaPairRDD<String, Integer> swapped =
- pairRDD.flatMapToPair(x -> Collections.singletonList(x.swap()));
- swapped.collect();
-
- // There was never a bug here, but it's worth testing:
- pairRDD.map(Tuple2::swap).collect();
- }
-
- @Test
- public void mapPartitions() {
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2);
- JavaRDD<Integer> partitionSums = rdd.mapPartitions(iter -> {
- int sum = 0;
- while (iter.hasNext()) {
- sum += iter.next();
- }
- return Collections.singletonList(sum);
- });
-
- Assert.assertEquals("[3, 7]", partitionSums.collect().toString());
- }
-
- @Test
- public void sequenceFile() {
- File tempDir = Files.createTempDir();
- tempDir.deleteOnExit();
- String outputDir = new File(tempDir, "output").getAbsolutePath();
- List<Tuple2<Integer, String>> pairs = Arrays.asList(
- new Tuple2<>(1, "a"),
- new Tuple2<>(2, "aa"),
- new Tuple2<>(3, "aaa")
- );
- JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
-
- rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2())))
- .saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
-
- // Try reading the output back as an object file
- JavaPairRDD<Integer, String> readRDD = sc.sequenceFile(outputDir, IntWritable.class, Text.class)
- .mapToPair(pair -> new Tuple2<>(pair._1().get(), pair._2().toString()));
- Assert.assertEquals(pairs, readRDD.collect());
- Utils.deleteRecursively(tempDir);
- }
-
- @Test
- public void zip() {
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
- JavaDoubleRDD doubles = rdd.mapToDouble(x -> 1.0 * x);
- JavaPairRDD<Integer, Double> zipped = rdd.zip(doubles);
- zipped.count();
- }
-
- @Test
- public void zipPartitions() {
- JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6), 2);
- JavaRDD<String> rdd2 = sc.parallelize(Arrays.asList("1", "2", "3", "4"), 2);
- FlatMapFunction2<Iterator<Integer>, Iterator<String>, Integer> sizesFn =
- (Iterator<Integer> i, Iterator<String> s) -> {
- int sizeI = 0;
- while (i.hasNext()) {
- sizeI += 1;
- i.next();
- }
- int sizeS = 0;
- while (s.hasNext()) {
- sizeS += 1;
- s.next();
- }
- return Arrays.asList(sizeI, sizeS).iterator();
- };
- JavaRDD<Integer> sizes = rdd1.zipPartitions(rdd2, sizesFn);
- Assert.assertEquals("[3, 2, 3, 2]", sizes.collect().toString());
- }
-
- @Test
- public void accumulators() {
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
-
- Accumulator<Integer> intAccum = sc.intAccumulator(10);
- rdd.foreach(intAccum::add);
- Assert.assertEquals((Integer) 25, intAccum.value());
-
- Accumulator<Double> doubleAccum = sc.doubleAccumulator(10.0);
- rdd.foreach(x -> doubleAccum.add((double) x));
- Assert.assertEquals((Double) 25.0, doubleAccum.value());
-
- // Try a custom accumulator type
- AccumulatorParam<Float> floatAccumulatorParam = new AccumulatorParam<Float>() {
- @Override
- public Float addInPlace(Float r, Float t) {
- return r + t;
- }
- @Override
- public Float addAccumulator(Float r, Float t) {
- return r + t;
- }
- @Override
- public Float zero(Float initialValue) {
- return 0.0f;
- }
- };
-
- Accumulator<Float> floatAccum = sc.accumulator(10.0f, floatAccumulatorParam);
- rdd.foreach(x -> floatAccum.add((float) x));
- Assert.assertEquals((Float) 25.0f, floatAccum.value());
-
- // Test the setValue method
- floatAccum.setValue(5.0f);
- Assert.assertEquals((Float) 5.0f, floatAccum.value());
- }
-
- @Test
- public void keyBy() {
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2));
- List<Tuple2<String, Integer>> s = rdd.keyBy(Object::toString).collect();
- Assert.assertEquals(new Tuple2<>("1", 1), s.get(0));
- Assert.assertEquals(new Tuple2<>("2", 2), s.get(1));
- }
-
- @Test
- public void mapOnPairRDD() {
- JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4));
- JavaPairRDD<Integer, Integer> rdd2 =
- rdd1.mapToPair(i -> new Tuple2<>(i, i % 2));
- JavaPairRDD<Integer, Integer> rdd3 =
- rdd2.mapToPair(in -> new Tuple2<>(in._2(), in._1()));
- Assert.assertEquals(Arrays.asList(
- new Tuple2<>(1, 1),
- new Tuple2<>(0, 2),
- new Tuple2<>(1, 3),
- new Tuple2<>(0, 4)), rdd3.collect());
- }
-
- @Test
- public void collectPartitions() {
- JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3);
-
- JavaPairRDD<Integer, Integer> rdd2 =
- rdd1.mapToPair(i -> new Tuple2<>(i, i % 2));
- List<Integer>[] parts = rdd1.collectPartitions(new int[]{0});
- Assert.assertEquals(Arrays.asList(1, 2), parts[0]);
-
- parts = rdd1.collectPartitions(new int[]{1, 2});
- Assert.assertEquals(Arrays.asList(3, 4), parts[0]);
- Assert.assertEquals(Arrays.asList(5, 6, 7), parts[1]);
-
- Assert.assertEquals(Arrays.asList(new Tuple2<>(1, 1), new Tuple2<>(2, 0)),
- rdd2.collectPartitions(new int[]{0})[0]);
-
- List<Tuple2<Integer, Integer>>[] parts2 = rdd2.collectPartitions(new int[]{1, 2});
- Assert.assertEquals(Arrays.asList(new Tuple2<>(3, 1), new Tuple2<>(4, 0)), parts2[0]);
- Assert.assertEquals(Arrays.asList(new Tuple2<>(5, 1), new Tuple2<>(6, 0), new Tuple2<>(7, 1)),
- parts2[1]);
- }
-
- @Test
- public void collectAsMapWithIntArrayValues() {
- // Regression test for SPARK-1040
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1));
- JavaPairRDD<Integer, int[]> pairRDD =
- rdd.mapToPair(x -> new Tuple2<>(x, new int[]{x}));
- pairRDD.collect(); // Works fine
- pairRDD.collectAsMap(); // Used to crash with ClassCastException
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
----------------------------------------------------------------------
diff --git a/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java b/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
deleted file mode 100644
index 604d818..0000000
--- a/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
+++ /dev/null
@@ -1,905 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming;
-
-import java.io.Serializable;
-import java.util.*;
-
-import scala.Tuple2;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import org.junit.Assert;
-import org.junit.Test;
-
-import org.apache.spark.Accumulator;
-import org.apache.spark.HashPartitioner;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaPairDStream;
-import org.apache.spark.streaming.api.java.JavaMapWithStateDStream;
-
-/**
- * Most of these tests replicate org.apache.spark.streaming.JavaAPISuite using java 8
- * lambda syntax.
- */
-@SuppressWarnings("unchecked")
-public class Java8APISuite extends LocalJavaStreamingContext implements Serializable {
-
- @Test
- public void testMap() {
- List<List<String>> inputData = Arrays.asList(
- Arrays.asList("hello", "world"),
- Arrays.asList("goodnight", "moon"));
-
- List<List<Integer>> expected = Arrays.asList(
- Arrays.asList(5, 5),
- Arrays.asList(9, 4));
-
- JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream<Integer> letterCount = stream.map(String::length);
- JavaTestUtils.attachTestOutputStream(letterCount);
- List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-
- assertOrderInvariantEquals(expected, result);
- }
-
- @Test
- public void testFilter() {
- List<List<String>> inputData = Arrays.asList(
- Arrays.asList("giants", "dodgers"),
- Arrays.asList("yankees", "red sox"));
-
- List<List<String>> expected = Arrays.asList(
- Arrays.asList("giants"),
- Arrays.asList("yankees"));
-
- JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream<String> filtered = stream.filter(s -> s.contains("a"));
- JavaTestUtils.attachTestOutputStream(filtered);
- List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-
- assertOrderInvariantEquals(expected, result);
- }
-
- @Test
- public void testMapPartitions() {
- List<List<String>> inputData = Arrays.asList(
- Arrays.asList("giants", "dodgers"),
- Arrays.asList("yankees", "red sox"));
-
- List<List<String>> expected = Arrays.asList(
- Arrays.asList("GIANTSDODGERS"),
- Arrays.asList("YANKEESRED SOX"));
-
- JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream<String> mapped = stream.mapPartitions(in -> {
- String out = "";
- while (in.hasNext()) {
- out = out + in.next().toUpperCase();
- }
- return Lists.newArrayList(out);
- });
- JavaTestUtils.attachTestOutputStream(mapped);
- List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-
- Assert.assertEquals(expected, result);
- }
-
- @Test
- public void testReduce() {
- List<List<Integer>> inputData = Arrays.asList(
- Arrays.asList(1, 2, 3),
- Arrays.asList(4, 5, 6),
- Arrays.asList(7, 8, 9));
-
- List<List<Integer>> expected = Arrays.asList(
- Arrays.asList(6),
- Arrays.asList(15),
- Arrays.asList(24));
-
- JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream<Integer> reduced = stream.reduce((x, y) -> x + y);
- JavaTestUtils.attachTestOutputStream(reduced);
- List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3);
-
- Assert.assertEquals(expected, result);
- }
-
- @Test
- public void testReduceByWindow() {
- List<List<Integer>> inputData = Arrays.asList(
- Arrays.asList(1, 2, 3),
- Arrays.asList(4, 5, 6),
- Arrays.asList(7, 8, 9));
-
- List<List<Integer>> expected = Arrays.asList(
- Arrays.asList(6),
- Arrays.asList(21),
- Arrays.asList(39),
- Arrays.asList(24));
-
- JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream<Integer> reducedWindowed = stream.reduceByWindow((x, y) -> x + y,
- (x, y) -> x - y, new Duration(2000), new Duration(1000));
- JavaTestUtils.attachTestOutputStream(reducedWindowed);
- List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4);
-
- Assert.assertEquals(expected, result);
- }
-
- @Test
- public void testTransform() {
- List<List<Integer>> inputData = Arrays.asList(
- Arrays.asList(1, 2, 3),
- Arrays.asList(4, 5, 6),
- Arrays.asList(7, 8, 9));
-
- List<List<Integer>> expected = Arrays.asList(
- Arrays.asList(3, 4, 5),
- Arrays.asList(6, 7, 8),
- Arrays.asList(9, 10, 11));
-
- JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream<Integer> transformed = stream.transform(in -> in.map(i -> i + 2));
-
- JavaTestUtils.attachTestOutputStream(transformed);
- List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3);
-
- assertOrderInvariantEquals(expected, result);
- }
-
- @Test
- public void testVariousTransform() {
- // tests whether all variations of transform can be called from Java
-
- List<List<Integer>> inputData = Arrays.asList(Arrays.asList(1));
- JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
-
- List<List<Tuple2<String, Integer>>> pairInputData =
- Arrays.asList(Arrays.asList(new Tuple2<>("x", 1)));
- JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(
- JavaTestUtils.attachTestInputStream(ssc, pairInputData, 1));
-
- JavaDStream<Integer> transformed1 = stream.transform(in -> null);
- JavaDStream<Integer> transformed2 = stream.transform((x, time) -> null);
- JavaPairDStream<String, Integer> transformed3 = stream.transformToPair(x -> null);
- JavaPairDStream<String, Integer> transformed4 = stream.transformToPair((x, time) -> null);
- JavaDStream<Integer> pairTransformed1 = pairStream.transform(x -> null);
- JavaDStream<Integer> pairTransformed2 = pairStream.transform((x, time) -> null);
- JavaPairDStream<String, String> pairTransformed3 = pairStream.transformToPair(x -> null);
- JavaPairDStream<String, String> pairTransformed4 =
- pairStream.transformToPair((x, time) -> null);
-
- }
-
- @Test
- public void testTransformWith() {
- List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
- Arrays.asList(
- new Tuple2<>("california", "dodgers"),
- new Tuple2<>("new york", "yankees")),
- Arrays.asList(
- new Tuple2<>("california", "sharks"),
- new Tuple2<>("new york", "rangers")));
-
- List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList(
- Arrays.asList(
- new Tuple2<>("california", "giants"),
- new Tuple2<>("new york", "mets")),
- Arrays.asList(
- new Tuple2<>("california", "ducks"),
- new Tuple2<>("new york", "islanders")));
-
-
- List<Set<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList(
- Sets.newHashSet(
- new Tuple2<>("california",
- new Tuple2<>("dodgers", "giants")),
- new Tuple2<>("new york",
- new Tuple2<>("yankees", "mets"))),
- Sets.newHashSet(
- new Tuple2<>("california",
- new Tuple2<>("sharks", "ducks")),
- new Tuple2<>("new york",
- new Tuple2<>("rangers", "islanders"))));
-
- JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream(
- ssc, stringStringKVStream1, 1);
- JavaPairDStream<String, String> pairStream1 = JavaPairDStream.fromJavaDStream(stream1);
-
- JavaDStream<Tuple2<String, String>> stream2 = JavaTestUtils.attachTestInputStream(
- ssc, stringStringKVStream2, 1);
- JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2);
-
- JavaPairDStream<String, Tuple2<String, String>> joined =
- pairStream1.transformWithToPair(pairStream2,(x, y, z) -> x.join(y));
-
- JavaTestUtils.attachTestOutputStream(joined);
- List<List<Tuple2<String, Tuple2<String, String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
- List<Set<Tuple2<String, Tuple2<String, String>>>> unorderedResult = Lists.newArrayList();
- for (List<Tuple2<String, Tuple2<String, String>>> res : result) {
- unorderedResult.add(Sets.newHashSet(res));
- }
-
- Assert.assertEquals(expected, unorderedResult);
- }
-
-
- @Test
- public void testVariousTransformWith() {
- // tests whether all variations of transformWith can be called from Java
-
- List<List<Integer>> inputData1 = Arrays.asList(Arrays.asList(1));
- List<List<String>> inputData2 = Arrays.asList(Arrays.asList("x"));
- JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, inputData1, 1);
- JavaDStream<String> stream2 = JavaTestUtils.attachTestInputStream(ssc, inputData2, 1);
-
- List<List<Tuple2<String, Integer>>> pairInputData1 =
- Arrays.asList(Arrays.asList(new Tuple2<>("x", 1)));
- List<List<Tuple2<Double, Character>>> pairInputData2 =
- Arrays.asList(Arrays.asList(new Tuple2<>(1.0, 'x')));
- JavaPairDStream<String, Integer> pairStream1 = JavaPairDStream.fromJavaDStream(
- JavaTestUtils.attachTestInputStream(ssc, pairInputData1, 1));
- JavaPairDStream<Double, Character> pairStream2 = JavaPairDStream.fromJavaDStream(
- JavaTestUtils.attachTestInputStream(ssc, pairInputData2, 1));
-
- JavaDStream<Double> transformed1 = stream1.transformWith(stream2, (x, y, z) -> null);
- JavaDStream<Double> transformed2 = stream1.transformWith(pairStream1,(x, y, z) -> null);
-
- JavaPairDStream<Double, Double> transformed3 =
- stream1.transformWithToPair(stream2,(x, y, z) -> null);
-
- JavaPairDStream<Double, Double> transformed4 =
- stream1.transformWithToPair(pairStream1,(x, y, z) -> null);
-
- JavaDStream<Double> pairTransformed1 = pairStream1.transformWith(stream2,(x, y, z) -> null);
-
- JavaDStream<Double> pairTransformed2_ =
- pairStream1.transformWith(pairStream1,(x, y, z) -> null);
-
- JavaPairDStream<Double, Double> pairTransformed3 =
- pairStream1.transformWithToPair(stream2,(x, y, z) -> null);
-
- JavaPairDStream<Double, Double> pairTransformed4 =
- pairStream1.transformWithToPair(pairStream2,(x, y, z) -> null);
- }
-
- @Test
- public void testStreamingContextTransform() {
- List<List<Integer>> stream1input = Arrays.asList(
- Arrays.asList(1),
- Arrays.asList(2)
- );
-
- List<List<Integer>> stream2input = Arrays.asList(
- Arrays.asList(3),
- Arrays.asList(4)
- );
-
- List<List<Tuple2<Integer, String>>> pairStream1input = Arrays.asList(
- Arrays.asList(new Tuple2<>(1, "x")),
- Arrays.asList(new Tuple2<>(2, "y"))
- );
-
- List<List<Tuple2<Integer, Tuple2<Integer, String>>>> expected = Arrays.asList(
- Arrays.asList(new Tuple2<>(1, new Tuple2<>(1, "x"))),
- Arrays.asList(new Tuple2<>(2, new Tuple2<>(2, "y")))
- );
-
- JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, stream1input, 1);
- JavaDStream<Integer> stream2 = JavaTestUtils.attachTestInputStream(ssc, stream2input, 1);
- JavaPairDStream<Integer, String> pairStream1 = JavaPairDStream.fromJavaDStream(
- JavaTestUtils.attachTestInputStream(ssc, pairStream1input, 1));
-
- List<JavaDStream<?>> listOfDStreams1 = Arrays.<JavaDStream<?>>asList(stream1, stream2);
-
- // This is just to test whether this transform to JavaStream compiles
- JavaDStream<Long> transformed1 = ssc.transform(
- listOfDStreams1, (List<JavaRDD<?>> listOfRDDs, Time time) -> {
- Assert.assertEquals(2, listOfRDDs.size());
- return null;
- });
-
- List<JavaDStream<?>> listOfDStreams2 =
- Arrays.<JavaDStream<?>>asList(stream1, stream2, pairStream1.toJavaDStream());
-
- JavaPairDStream<Integer, Tuple2<Integer, String>> transformed2 = ssc.transformToPair(
- listOfDStreams2, (List<JavaRDD<?>> listOfRDDs, Time time) -> {
- Assert.assertEquals(3, listOfRDDs.size());
- JavaRDD<Integer> rdd1 = (JavaRDD<Integer>) listOfRDDs.get(0);
- JavaRDD<Integer> rdd2 = (JavaRDD<Integer>) listOfRDDs.get(1);
- JavaRDD<Tuple2<Integer, String>> rdd3 = (JavaRDD<Tuple2<Integer, String>>) listOfRDDs.get(2);
- JavaPairRDD<Integer, String> prdd3 = JavaPairRDD.fromJavaRDD(rdd3);
- PairFunction<Integer, Integer, Integer> mapToTuple =
- (Integer i) -> new Tuple2<>(i, i);
- return rdd1.union(rdd2).mapToPair(mapToTuple).join(prdd3);
- });
- JavaTestUtils.attachTestOutputStream(transformed2);
- List<List<Tuple2<Integer, Tuple2<Integer, String>>>> result =
- JavaTestUtils.runStreams(ssc, 2, 2);
- Assert.assertEquals(expected, result);
- }
-
- @Test
- public void testFlatMap() {
- List<List<String>> inputData = Arrays.asList(
- Arrays.asList("go", "giants"),
- Arrays.asList("boo", "dodgers"),
- Arrays.asList("athletics"));
-
- List<List<String>> expected = Arrays.asList(
- Arrays.asList("g", "o", "g", "i", "a", "n", "t", "s"),
- Arrays.asList("b", "o", "o", "d", "o", "d", "g", "e", "r", "s"),
- Arrays.asList("a", "t", "h", "l", "e", "t", "i", "c", "s"));
-
- JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream<String> flatMapped = stream.flatMap(s -> Lists.newArrayList(s.split("(?!^)")));
- JavaTestUtils.attachTestOutputStream(flatMapped);
- List<List<String>> result = JavaTestUtils.runStreams(ssc, 3, 3);
-
- assertOrderInvariantEquals(expected, result);
- }
-
- @Test
- public void testForeachRDD() {
- final Accumulator<Integer> accumRdd = ssc.sc().accumulator(0);
- final Accumulator<Integer> accumEle = ssc.sc().accumulator(0);
- List<List<Integer>> inputData = Arrays.asList(
- Arrays.asList(1,1,1),
- Arrays.asList(1,1,1));
-
- JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaTestUtils.attachTestOutputStream(stream.count()); // dummy output
-
- stream.foreachRDD(rdd -> {
- accumRdd.add(1);
- rdd.foreach(x -> accumEle.add(1));
- });
-
- // This is a test to make sure foreachRDD(VoidFunction2) can be called from Java
- stream.foreachRDD((rdd, time) -> null);
-
- JavaTestUtils.runStreams(ssc, 2, 2);
-
- Assert.assertEquals(2, accumRdd.value().intValue());
- Assert.assertEquals(6, accumEle.value().intValue());
- }
-
- @Test
- public void testPairFlatMap() {
- List<List<String>> inputData = Arrays.asList(
- Arrays.asList("giants"),
- Arrays.asList("dodgers"),
- Arrays.asList("athletics"));
-
- List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
- Arrays.asList(
- new Tuple2<>(6, "g"),
- new Tuple2<>(6, "i"),
- new Tuple2<>(6, "a"),
- new Tuple2<>(6, "n"),
- new Tuple2<>(6, "t"),
- new Tuple2<>(6, "s")),
- Arrays.asList(
- new Tuple2<>(7, "d"),
- new Tuple2<>(7, "o"),
- new Tuple2<>(7, "d"),
- new Tuple2<>(7, "g"),
- new Tuple2<>(7, "e"),
- new Tuple2<>(7, "r"),
- new Tuple2<>(7, "s")),
- Arrays.asList(
- new Tuple2<>(9, "a"),
- new Tuple2<>(9, "t"),
- new Tuple2<>(9, "h"),
- new Tuple2<>(9, "l"),
- new Tuple2<>(9, "e"),
- new Tuple2<>(9, "t"),
- new Tuple2<>(9, "i"),
- new Tuple2<>(9, "c"),
- new Tuple2<>(9, "s")));
-
- JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaPairDStream<Integer, String> flatMapped = stream.flatMapToPair(s -> {
- List<Tuple2<Integer, String>> out = Lists.newArrayList();
- for (String letter : s.split("(?!^)")) {
- out.add(new Tuple2<>(s.length(), letter));
- }
- return out;
- });
-
- JavaTestUtils.attachTestOutputStream(flatMapped);
- List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
-
- Assert.assertEquals(expected, result);
- }
-
- /*
- * Performs an order-invariant comparison of lists representing two RDD streams. This allows
- * us to account for ordering variation within individual RDD's which occurs during windowing.
- */
- public static <T extends Comparable<T>> void assertOrderInvariantEquals(
- List<List<T>> expected, List<List<T>> actual) {
- expected.forEach(list -> Collections.sort(list));
- List<List<T>> sortedActual = new ArrayList<>();
- actual.forEach(list -> {
- List<T> sortedList = new ArrayList<>(list);
- Collections.sort(sortedList);
- sortedActual.add(sortedList);
- });
- Assert.assertEquals(expected, sortedActual);
- }
-
- @Test
- public void testPairFilter() {
- List<List<String>> inputData = Arrays.asList(
- Arrays.asList("giants", "dodgers"),
- Arrays.asList("yankees", "red sox"));
-
- List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
- Arrays.asList(new Tuple2<>("giants", 6)),
- Arrays.asList(new Tuple2<>("yankees", 7)));
-
- JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaPairDStream<String, Integer> pairStream =
- stream.mapToPair(x -> new Tuple2<>(x, x.length()));
- JavaPairDStream<String, Integer> filtered = pairStream.filter(x -> x._1().contains("a"));
- JavaTestUtils.attachTestOutputStream(filtered);
- List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-
- Assert.assertEquals(expected, result);
- }
-
- List<List<Tuple2<String, String>>> stringStringKVStream = Arrays.asList(
- Arrays.asList(new Tuple2<>("california", "dodgers"),
- new Tuple2<>("california", "giants"),
- new Tuple2<>("new york", "yankees"),
- new Tuple2<>("new york", "mets")),
- Arrays.asList(new Tuple2<>("california", "sharks"),
- new Tuple2<>("california", "ducks"),
- new Tuple2<>("new york", "rangers"),
- new Tuple2<>("new york", "islanders")));
-
- List<List<Tuple2<String, Integer>>> stringIntKVStream = Arrays.asList(
- Arrays.asList(
- new Tuple2<>("california", 1),
- new Tuple2<>("california", 3),
- new Tuple2<>("new york", 4),
- new Tuple2<>("new york", 1)),
- Arrays.asList(
- new Tuple2<>("california", 5),
- new Tuple2<>("california", 5),
- new Tuple2<>("new york", 3),
- new Tuple2<>("new york", 1)));
-
- @Test
- public void testPairMap() { // Maps pair -> pair of different type
- List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
-
- List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
- Arrays.asList(
- new Tuple2<>(1, "california"),
- new Tuple2<>(3, "california"),
- new Tuple2<>(4, "new york"),
- new Tuple2<>(1, "new york")),
- Arrays.asList(
- new Tuple2<>(5, "california"),
- new Tuple2<>(5, "california"),
- new Tuple2<>(3, "new york"),
- new Tuple2<>(1, "new york")));
-
- JavaDStream<Tuple2<String, Integer>> stream =
- JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
- JavaPairDStream<Integer, String> reversed = pairStream.mapToPair(x -> x.swap());
- JavaTestUtils.attachTestOutputStream(reversed);
- List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-
- Assert.assertEquals(expected, result);
- }
-
- @Test
- public void testPairMapPartitions() { // Maps pair -> pair of different type
- List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
-
- List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
- Arrays.asList(
- new Tuple2<>(1, "california"),
- new Tuple2<>(3, "california"),
- new Tuple2<>(4, "new york"),
- new Tuple2<>(1, "new york")),
- Arrays.asList(
- new Tuple2<>(5, "california"),
- new Tuple2<>(5, "california"),
- new Tuple2<>(3, "new york"),
- new Tuple2<>(1, "new york")));
-
- JavaDStream<Tuple2<String, Integer>> stream =
- JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
- JavaPairDStream<Integer, String> reversed = pairStream.mapPartitionsToPair(in -> {
- LinkedList<Tuple2<Integer, String>> out = new LinkedList<>();
- while (in.hasNext()) {
- Tuple2<String, Integer> next = in.next();
- out.add(next.swap());
- }
- return out;
- });
-
- JavaTestUtils.attachTestOutputStream(reversed);
- List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-
- Assert.assertEquals(expected, result);
- }
-
- @Test
- public void testPairMap2() { // Maps pair -> single
- List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
-
- List<List<Integer>> expected = Arrays.asList(
- Arrays.asList(1, 3, 4, 1),
- Arrays.asList(5, 5, 3, 1));
-
- JavaDStream<Tuple2<String, Integer>> stream =
- JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
- JavaDStream<Integer> reversed = pairStream.map(in -> in._2());
- JavaTestUtils.attachTestOutputStream(reversed);
- List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-
- Assert.assertEquals(expected, result);
- }
-
- @Test
- public void testPairToPairFlatMapWithChangingTypes() { // Maps pair -> pair
- List<List<Tuple2<String, Integer>>> inputData = Arrays.asList(
- Arrays.asList(
- new Tuple2<>("hi", 1),
- new Tuple2<>("ho", 2)),
- Arrays.asList(
- new Tuple2<>("hi", 1),
- new Tuple2<>("ho", 2)));
-
- List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
- Arrays.asList(
- new Tuple2<>(1, "h"),
- new Tuple2<>(1, "i"),
- new Tuple2<>(2, "h"),
- new Tuple2<>(2, "o")),
- Arrays.asList(
- new Tuple2<>(1, "h"),
- new Tuple2<>(1, "i"),
- new Tuple2<>(2, "h"),
- new Tuple2<>(2, "o")));
-
- JavaDStream<Tuple2<String, Integer>> stream =
- JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
- JavaPairDStream<Integer, String> flatMapped = pairStream.flatMapToPair(in -> {
- List<Tuple2<Integer, String>> out = new LinkedList<>();
- for (Character s : in._1().toCharArray()) {
- out.add(new Tuple2<>(in._2(), s.toString()));
- }
- return out;
- });
-
- JavaTestUtils.attachTestOutputStream(flatMapped);
- List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-
- Assert.assertEquals(expected, result);
- }
-
- @Test
- public void testPairReduceByKey() {
- List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
-
- List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
- Arrays.asList(
- new Tuple2<>("california", 4),
- new Tuple2<>("new york", 5)),
- Arrays.asList(
- new Tuple2<>("california", 10),
- new Tuple2<>("new york", 4)));
-
- JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(
- ssc, inputData, 1);
- JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
-
- JavaPairDStream<String, Integer> reduced = pairStream.reduceByKey((x, y) -> x + y);
-
- JavaTestUtils.attachTestOutputStream(reduced);
- List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-
- Assert.assertEquals(expected, result);
- }
-
- @Test
- public void testCombineByKey() {
- List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
-
- List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
- Arrays.asList(
- new Tuple2<>("california", 4),
- new Tuple2<>("new york", 5)),
- Arrays.asList(
- new Tuple2<>("california", 10),
- new Tuple2<>("new york", 4)));
-
- JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(
- ssc, inputData, 1);
- JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
-
- JavaPairDStream<String, Integer> combined = pairStream.<Integer>combineByKey(i -> i,
- (x, y) -> x + y, (x, y) -> x + y, new HashPartitioner(2));
-
- JavaTestUtils.attachTestOutputStream(combined);
- List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-
- Assert.assertEquals(expected, result);
- }
-
- @Test
- public void testReduceByKeyAndWindow() {
- List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
-
- List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
- Arrays.asList(new Tuple2<>("california", 4),
- new Tuple2<>("new york", 5)),
- Arrays.asList(new Tuple2<>("california", 14),
- new Tuple2<>("new york", 9)),
- Arrays.asList(new Tuple2<>("california", 10),
- new Tuple2<>("new york", 4)));
-
- JavaDStream<Tuple2<String, Integer>> stream =
- JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
-
- JavaPairDStream<String, Integer> reduceWindowed =
- pairStream.reduceByKeyAndWindow((x, y) -> x + y, new Duration(2000), new Duration(1000));
- JavaTestUtils.attachTestOutputStream(reduceWindowed);
- List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
-
- Assert.assertEquals(expected, result);
- }
-
- @Test
- public void testUpdateStateByKey() {
- List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
-
- List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
- Arrays.asList(new Tuple2<>("california", 4),
- new Tuple2<>("new york", 5)),
- Arrays.asList(new Tuple2<>("california", 14),
- new Tuple2<>("new york", 9)),
- Arrays.asList(new Tuple2<>("california", 14),
- new Tuple2<>("new york", 9)));
-
- JavaDStream<Tuple2<String, Integer>> stream =
- JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
-
- JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey((values, state) -> {
- int out = 0;
- if (state.isPresent()) {
- out = out + state.get();
- }
- for (Integer v : values) {
- out = out + v;
- }
- return Optional.of(out);
- });
-
- JavaTestUtils.attachTestOutputStream(updated);
- List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
-
- Assert.assertEquals(expected, result);
- }
-
- @Test
- public void testReduceByKeyAndWindowWithInverse() {
- List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
-
- List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
- Arrays.asList(new Tuple2<>("california", 4),
- new Tuple2<>("new york", 5)),
- Arrays.asList(new Tuple2<>("california", 14),
- new Tuple2<>("new york", 9)),
- Arrays.asList(new Tuple2<>("california", 10),
- new Tuple2<>("new york", 4)));
-
- JavaDStream<Tuple2<String, Integer>> stream =
- JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
-
- JavaPairDStream<String, Integer> reduceWindowed =
- pairStream.reduceByKeyAndWindow((x, y) -> x + y, (x, y) -> x - y, new Duration(2000),
- new Duration(1000));
- JavaTestUtils.attachTestOutputStream(reduceWindowed);
- List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
-
- Assert.assertEquals(expected, result);
- }
-
- @Test
- public void testPairTransform() {
- List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList(
- Arrays.asList(
- new Tuple2<>(3, 5),
- new Tuple2<>(1, 5),
- new Tuple2<>(4, 5),
- new Tuple2<>(2, 5)),
- Arrays.asList(
- new Tuple2<>(2, 5),
- new Tuple2<>(3, 5),
- new Tuple2<>(4, 5),
- new Tuple2<>(1, 5)));
-
- List<List<Tuple2<Integer, Integer>>> expected = Arrays.asList(
- Arrays.asList(
- new Tuple2<>(1, 5),
- new Tuple2<>(2, 5),
- new Tuple2<>(3, 5),
- new Tuple2<>(4, 5)),
- Arrays.asList(
- new Tuple2<>(1, 5),
- new Tuple2<>(2, 5),
- new Tuple2<>(3, 5),
- new Tuple2<>(4, 5)));
-
- JavaDStream<Tuple2<Integer, Integer>> stream = JavaTestUtils.attachTestInputStream(
- ssc, inputData, 1);
- JavaPairDStream<Integer, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
-
- JavaPairDStream<Integer, Integer> sorted = pairStream.transformToPair(in -> in.sortByKey());
-
- JavaTestUtils.attachTestOutputStream(sorted);
- List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-
- Assert.assertEquals(expected, result);
- }
-
- @Test
- public void testPairToNormalRDDTransform() {
- List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList(
- Arrays.asList(
- new Tuple2<>(3, 5),
- new Tuple2<>(1, 5),
- new Tuple2<>(4, 5),
- new Tuple2<>(2, 5)),
- Arrays.asList(
- new Tuple2<>(2, 5),
- new Tuple2<>(3, 5),
- new Tuple2<>(4, 5),
- new Tuple2<>(1, 5)));
-
- List<List<Integer>> expected = Arrays.asList(
- Arrays.asList(3, 1, 4, 2),
- Arrays.asList(2, 3, 4, 1));
-
- JavaDStream<Tuple2<Integer, Integer>> stream = JavaTestUtils.attachTestInputStream(
- ssc, inputData, 1);
- JavaPairDStream<Integer, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
- JavaDStream<Integer> firstParts = pairStream.transform(in -> in.map(x -> x._1()));
- JavaTestUtils.attachTestOutputStream(firstParts);
- List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-
- Assert.assertEquals(expected, result);
- }
-
- @Test
- public void testMapValues() {
- List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
-
- List<List<Tuple2<String, String>>> expected = Arrays.asList(
- Arrays.asList(new Tuple2<>("california", "DODGERS"),
- new Tuple2<>("california", "GIANTS"),
- new Tuple2<>("new york", "YANKEES"),
- new Tuple2<>("new york", "METS")),
- Arrays.asList(new Tuple2<>("california", "SHARKS"),
- new Tuple2<>("california", "DUCKS"),
- new Tuple2<>("new york", "RANGERS"),
- new Tuple2<>("new york", "ISLANDERS")));
-
- JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
- ssc, inputData, 1);
- JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
-
- JavaPairDStream<String, String> mapped = pairStream.mapValues(String::toUpperCase);
- JavaTestUtils.attachTestOutputStream(mapped);
- List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-
- Assert.assertEquals(expected, result);
- }
-
- @Test
- public void testFlatMapValues() {
- List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
-
- List<List<Tuple2<String, String>>> expected = Arrays.asList(
- Arrays.asList(new Tuple2<>("california", "dodgers1"),
- new Tuple2<>("california", "dodgers2"),
- new Tuple2<>("california", "giants1"),
- new Tuple2<>("california", "giants2"),
- new Tuple2<>("new york", "yankees1"),
- new Tuple2<>("new york", "yankees2"),
- new Tuple2<>("new york", "mets1"),
- new Tuple2<>("new york", "mets2")),
- Arrays.asList(new Tuple2<>("california", "sharks1"),
- new Tuple2<>("california", "sharks2"),
- new Tuple2<>("california", "ducks1"),
- new Tuple2<>("california", "ducks2"),
- new Tuple2<>("new york", "rangers1"),
- new Tuple2<>("new york", "rangers2"),
- new Tuple2<>("new york", "islanders1"),
- new Tuple2<>("new york", "islanders2")));
-
- JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
- ssc, inputData, 1);
- JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
-
- JavaPairDStream<String, String> flatMapped =
- pairStream.flatMapValues(in -> Arrays.asList(in + "1", in + "2"));
- JavaTestUtils.attachTestOutputStream(flatMapped);
- List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
- Assert.assertEquals(expected, result);
- }
-
- /**
- * This test is only for testing the APIs. It's not necessary to run it.
- */
- public void testMapWithStateAPI() {
- JavaPairRDD<String, Boolean> initialRDD = null;
- JavaPairDStream<String, Integer> wordsDstream = null;
-
- JavaMapWithStateDStream<String, Integer, Boolean, Double> stateDstream =
- wordsDstream.mapWithState(
- StateSpec.<String, Integer, Boolean, Double> function((time, key, value, state) -> {
- // Use all State's methods here
- state.exists();
- state.get();
- state.isTimingOut();
- state.remove();
- state.update(true);
- return Optional.of(2.0);
- }).initialState(initialRDD)
- .numPartitions(10)
- .partitioner(new HashPartitioner(10))
- .timeout(Durations.seconds(10)));
-
- JavaPairDStream<String, Boolean> emittedRecords = stateDstream.stateSnapshots();
-
- JavaMapWithStateDStream<String, Integer, Boolean, Double> stateDstream2 =
- wordsDstream.mapWithState(
- StateSpec.<String, Integer, Boolean, Double>function((key, value, state) -> {
- state.exists();
- state.get();
- state.isTimingOut();
- state.remove();
- state.update(true);
- return 2.0;
- }).initialState(initialRDD)
- .numPartitions(10)
- .partitioner(new HashPartitioner(10))
- .timeout(Durations.seconds(10)));
-
- JavaPairDStream<String, Boolean> mappedDStream = stateDstream2.stateSnapshots();
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/extras/java8-tests/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/extras/java8-tests/src/test/resources/log4j.properties b/extras/java8-tests/src/test/resources/log4j.properties
deleted file mode 100644
index eb3b1999..0000000
--- a/extras/java8-tests/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,28 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# Set everything to be logged to the file target/unit-tests.log
-log4j.rootCategory=INFO, file
-log4j.appender.file=org.apache.log4j.FileAppender
-log4j.appender.file.append=true
-log4j.appender.file.file=target/unit-tests.log
-log4j.appender.file.layout=org.apache.log4j.PatternLayout
-log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
-
-# Ignore messages below warning level from Jetty, because it's a bit verbose
-log4j.logger.org.spark-project.jetty=WARN
-org.spark-project.jetty.LEVEL=WARN
http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/extras/java8-tests/src/test/scala/org/apache/spark/JDK8ScalaSuite.scala
----------------------------------------------------------------------
diff --git a/extras/java8-tests/src/test/scala/org/apache/spark/JDK8ScalaSuite.scala b/extras/java8-tests/src/test/scala/org/apache/spark/JDK8ScalaSuite.scala
deleted file mode 100644
index fa0681d..0000000
--- a/extras/java8-tests/src/test/scala/org/apache/spark/JDK8ScalaSuite.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-/**
- * Test cases where JDK8-compiled Scala user code is used with Spark.
- */
-class JDK8ScalaSuite extends SparkFunSuite with SharedSparkContext {
- test("basic RDD closure test (SPARK-6152)") {
- sc.parallelize(1 to 1000).map(x => x * x).count()
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/extras/kinesis-asl-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl-assembly/pom.xml b/extras/kinesis-asl-assembly/pom.xml
deleted file mode 100644
index d1c38c7..0000000
--- a/extras/kinesis-asl-assembly/pom.xml
+++ /dev/null
@@ -1,181 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one or more
- ~ contributor license agreements. See the NOTICE file distributed with
- ~ this work for additional information regarding copyright ownership.
- ~ The ASF licenses this file to You under the Apache License, Version 2.0
- ~ (the "License"); you may not use this file except in compliance with
- ~ the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-parent_2.11</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- <relativePath>../../pom.xml</relativePath>
- </parent>
-
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming-kinesis-asl-assembly_2.11</artifactId>
- <packaging>jar</packaging>
- <name>Spark Project Kinesis Assembly</name>
- <url>http://spark.apache.org/</url>
-
- <properties>
- <sbt.project.name>streaming-kinesis-asl-assembly</sbt.project.name>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming-kinesis-asl_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
- <!--
- Demote already included in the Spark assembly.
- -->
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>commons-lang</groupId>
- <artifactId>commons-lang</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>com.google.protobuf</groupId>
- <artifactId>protobuf-java</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>com.sun.jersey</groupId>
- <artifactId>jersey-server</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>com.sun.jersey</groupId>
- <artifactId>jersey-core</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>net.java.dev.jets3t</groupId>
- <artifactId>jets3t</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro-ipc</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro-mapred</artifactId>
- <classifier>${avro.mapred.classifier}</classifier>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-recipes</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.xerial.snappy</groupId>
- <artifactId>snappy-java</artifactId>
- <scope>provided</scope>
- </dependency>
- </dependencies>
-
- <build>
- <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
- <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <configuration>
- <shadedArtifactAttached>false</shadedArtifactAttached>
- <artifactSet>
- <includes>
- <include>*:*</include>
- </includes>
- </artifactSet>
- <filters>
- <filter>
- <artifact>*:*</artifact>
- <excludes>
- <exclude>META-INF/*.SF</exclude>
- <exclude>META-INF/*.DSA</exclude>
- <exclude>META-INF/*.RSA</exclude>
- </excludes>
- </filter>
- </filters>
- </configuration>
- <executions>
- <execution>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- <configuration>
- <transformers>
- <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
- <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
- <resource>reference.conf</resource>
- </transformer>
- <transformer implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
- <resource>log4j.properties</resource>
- </transformer>
- <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/>
- <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/>
- </transformers>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
-</build>
-</project>
-
http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/extras/kinesis-asl/pom.xml
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/pom.xml b/extras/kinesis-asl/pom.xml
deleted file mode 100644
index 935155e..0000000
--- a/extras/kinesis-asl/pom.xml
+++ /dev/null
@@ -1,87 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-~ Licensed to the Apache Software Foundation (ASF) under one or more
-~ contributor license agreements. See the NOTICE file distributed with
-~ this work for additional information regarding copyright ownership.
-~ The ASF licenses this file to You under the Apache License, Version 2.0
-~ (the "License"); you may not use this file except in compliance with
-~ the License. You may obtain a copy of the License at
-~
-~ http://www.apache.org/licenses/LICENSE-2.0
-~
-~ Unless required by applicable law or agreed to in writing, software
-~ distributed under the License is distributed on an "AS IS" BASIS,
-~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-~ See the License for the specific language governing permissions and
-~ limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-parent_2.11</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- <relativePath>../../pom.xml</relativePath>
- </parent>
-
- <!-- Kinesis integration is not included by default due to ASL-licensed code. -->
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming-kinesis-asl_2.11</artifactId>
- <packaging>jar</packaging>
- <name>Spark Kinesis Integration</name>
-
- <properties>
- <sbt.project.name>streaming-kinesis-asl</sbt.project.name>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>com.amazonaws</groupId>
- <artifactId>amazon-kinesis-client</artifactId>
- <version>${aws.kinesis.client.version}</version>
- </dependency>
- <dependency>
- <groupId>com.amazonaws</groupId>
- <artifactId>amazon-kinesis-producer</artifactId>
- <version>${aws.kinesis.producer.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-core</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.scalacheck</groupId>
- <artifactId>scalacheck_${scala.binary.version}</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-test-tags_${scala.binary.version}</artifactId>
- </dependency>
- </dependencies>
- <build>
- <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
- <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
- </build>
-</project>
http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java b/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java
deleted file mode 100644
index 5dc825d..0000000
--- a/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.examples.streaming;
-
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.regex.Pattern;
-
-import com.amazonaws.regions.RegionUtils;
-import org.apache.log4j.Logger;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.storage.StorageLevel;
-import org.apache.spark.streaming.Duration;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaPairDStream;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.apache.spark.streaming.kinesis.KinesisUtils;
-
-import scala.Tuple2;
-
-import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
-import com.amazonaws.services.kinesis.AmazonKinesisClient;
-import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
-
-/**
- * Consumes messages from a Amazon Kinesis streams and does wordcount.
- *
- * This example spins up 1 Kinesis Receiver per shard for the given stream.
- * It then starts pulling from the last checkpointed sequence number of the given stream.
- *
- * Usage: JavaKinesisWordCountASL [app-name] [stream-name] [endpoint-url] [region-name]
- * [app-name] is the name of the consumer app, used to track the read data in DynamoDB
- * [stream-name] name of the Kinesis stream (ie. mySparkStream)
- * [endpoint-url] endpoint of the Kinesis service
- * (e.g. https://kinesis.us-east-1.amazonaws.com)
- *
- *
- * Example:
- * # export AWS keys if necessary
- * $ export AWS_ACCESS_KEY_ID=[your-access-key]
- * $ export AWS_SECRET_KEY=<your-secret-key>
- *
- * # run the example
- * $ SPARK_HOME/bin/run-example streaming.JavaKinesisWordCountASL myAppName mySparkStream \
- * https://kinesis.us-east-1.amazonaws.com
- *
- * There is a companion helper class called KinesisWordProducerASL which puts dummy data
- * onto the Kinesis stream.
- *
- * This code uses the DefaultAWSCredentialsProviderChain to find credentials
- * in the following order:
- * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
- * Java System Properties - aws.accessKeyId and aws.secretKey
- * Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs
- * Instance profile credentials - delivered through the Amazon EC2 metadata service
- * For more information, see
- * http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html
- *
- * See http://spark.apache.org/docs/latest/streaming-kinesis-integration.html for more details on
- * the Kinesis Spark Streaming integration.
- */
-public final class JavaKinesisWordCountASL { // needs to be public for access from run-example
- private static final Pattern WORD_SEPARATOR = Pattern.compile(" ");
- private static final Logger logger = Logger.getLogger(JavaKinesisWordCountASL.class);
-
- public static void main(String[] args) {
- // Check that all required args were passed in.
- if (args.length != 3) {
- System.err.println(
- "Usage: JavaKinesisWordCountASL <stream-name> <endpoint-url>\n\n" +
- " <app-name> is the name of the app, used to track the read data in DynamoDB\n" +
- " <stream-name> is the name of the Kinesis stream\n" +
- " <endpoint-url> is the endpoint of the Kinesis service\n" +
- " (e.g. https://kinesis.us-east-1.amazonaws.com)\n" +
- "Generate data for the Kinesis stream using the example KinesisWordProducerASL.\n" +
- "See http://spark.apache.org/docs/latest/streaming-kinesis-integration.html for more\n" +
- "details.\n"
- );
- System.exit(1);
- }
-
- // Set default log4j logging level to WARN to hide Spark logs
- StreamingExamples.setStreamingLogLevels();
-
- // Populate the appropriate variables from the given args
- String kinesisAppName = args[0];
- String streamName = args[1];
- String endpointUrl = args[2];
-
- // Create a Kinesis client in order to determine the number of shards for the given stream
- AmazonKinesisClient kinesisClient =
- new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain());
- kinesisClient.setEndpoint(endpointUrl);
- int numShards =
- kinesisClient.describeStream(streamName).getStreamDescription().getShards().size();
-
-
- // In this example, we're going to create 1 Kinesis Receiver/input DStream for each shard.
- // This is not a necessity; if there are less receivers/DStreams than the number of shards,
- // then the shards will be automatically distributed among the receivers and each receiver
- // will receive data from multiple shards.
- int numStreams = numShards;
-
- // Spark Streaming batch interval
- Duration batchInterval = new Duration(2000);
-
- // Kinesis checkpoint interval. Same as batchInterval for this example.
- Duration kinesisCheckpointInterval = batchInterval;
-
- // Get the region name from the endpoint URL to save Kinesis Client Library metadata in
- // DynamoDB of the same region as the Kinesis stream
- String regionName = RegionUtils.getRegionByEndpoint(endpointUrl).getName();
-
- // Setup the Spark config and StreamingContext
- SparkConf sparkConfig = new SparkConf().setAppName("JavaKinesisWordCountASL");
- JavaStreamingContext jssc = new JavaStreamingContext(sparkConfig, batchInterval);
-
- // Create the Kinesis DStreams
- List<JavaDStream<byte[]>> streamsList = new ArrayList<>(numStreams);
- for (int i = 0; i < numStreams; i++) {
- streamsList.add(
- KinesisUtils.createStream(jssc, kinesisAppName, streamName, endpointUrl, regionName,
- InitialPositionInStream.LATEST, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2())
- );
- }
-
- // Union all the streams if there is more than 1 stream
- JavaDStream<byte[]> unionStreams;
- if (streamsList.size() > 1) {
- unionStreams = jssc.union(streamsList.get(0), streamsList.subList(1, streamsList.size()));
- } else {
- // Otherwise, just use the 1 stream
- unionStreams = streamsList.get(0);
- }
-
- // Convert each line of Array[Byte] to String, and split into words
- JavaDStream<String> words = unionStreams.flatMap(new FlatMapFunction<byte[], String>() {
- @Override
- public Iterator<String> call(byte[] line) {
- String s = new String(line, StandardCharsets.UTF_8);
- return Arrays.asList(WORD_SEPARATOR.split(s)).iterator();
- }
- });
-
- // Map each word to a (word, 1) tuple so we can reduce by key to count the words
- JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
- new PairFunction<String, String, Integer>() {
- @Override
- public Tuple2<String, Integer> call(String s) {
- return new Tuple2<String, Integer>(s, 1);
- }
- }
- ).reduceByKey(
- new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer i1, Integer i2) {
- return i1 + i2;
- }
- }
- );
-
- // Print the first 10 wordCounts
- wordCounts.print();
-
- // Start the streaming context and await termination
- jssc.start();
- jssc.awaitTermination();
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/extras/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py b/extras/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py
deleted file mode 100644
index 51f8c5c..0000000
--- a/extras/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py
+++ /dev/null
@@ -1,83 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""
- Consumes messages from a Amazon Kinesis streams and does wordcount.
-
- This example spins up 1 Kinesis Receiver per shard for the given stream.
- It then starts pulling from the last checkpointed sequence number of the given stream.
-
- Usage: kinesis_wordcount_asl.py <app-name> <stream-name> <endpoint-url> <region-name>
- <app-name> is the name of the consumer app, used to track the read data in DynamoDB
- <stream-name> name of the Kinesis stream (ie. mySparkStream)
- <endpoint-url> endpoint of the Kinesis service
- (e.g. https://kinesis.us-east-1.amazonaws.com)
-
-
- Example:
- # export AWS keys if necessary
- $ export AWS_ACCESS_KEY_ID=<your-access-key>
- $ export AWS_SECRET_KEY=<your-secret-key>
-
- # run the example
- $ bin/spark-submit -jar extras/kinesis-asl/target/scala-*/\
- spark-streaming-kinesis-asl-assembly_*.jar \
- extras/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py \
- myAppName mySparkStream https://kinesis.us-east-1.amazonaws.com
-
- There is a companion helper class called KinesisWordProducerASL which puts dummy data
- onto the Kinesis stream.
-
- This code uses the DefaultAWSCredentialsProviderChain to find credentials
- in the following order:
- Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
- Java System Properties - aws.accessKeyId and aws.secretKey
- Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs
- Instance profile credentials - delivered through the Amazon EC2 metadata service
- For more information, see
- http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html
-
- See http://spark.apache.org/docs/latest/streaming-kinesis-integration.html for more details on
- the Kinesis Spark Streaming integration.
-"""
-from __future__ import print_function
-
-import sys
-
-from pyspark import SparkContext
-from pyspark.streaming import StreamingContext
-from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
-
-if __name__ == "__main__":
- if len(sys.argv) != 5:
- print(
- "Usage: kinesis_wordcount_asl.py <app-name> <stream-name> <endpoint-url> <region-name>",
- file=sys.stderr)
- sys.exit(-1)
-
- sc = SparkContext(appName="PythonStreamingKinesisWordCountAsl")
- ssc = StreamingContext(sc, 1)
- appName, streamName, endpointUrl, regionName = sys.argv[1:]
- lines = KinesisUtils.createStream(
- ssc, appName, streamName, endpointUrl, regionName, InitialPositionInStream.LATEST, 2)
- counts = lines.flatMap(lambda line: line.split(" ")) \
- .map(lambda word: (word, 1)) \
- .reduceByKey(lambda a, b: a+b)
- counts.pprint()
-
- ssc.start()
- ssc.awaitTermination()
http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/extras/kinesis-asl/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/main/resources/log4j.properties b/extras/kinesis-asl/src/main/resources/log4j.properties
deleted file mode 100644
index 6cdc928..0000000
--- a/extras/kinesis-asl/src/main/resources/log4j.properties
+++ /dev/null
@@ -1,37 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-log4j.rootCategory=WARN, console
-
-# File appender
-log4j.appender.file=org.apache.log4j.FileAppender
-log4j.appender.file.append=false
-log4j.appender.file.file=target/unit-tests.log
-log4j.appender.file.layout=org.apache.log4j.PatternLayout
-log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
-
-# Console appender
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.target=System.out
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
-
-# Settings to quiet third party logs that are too verbose
-log4j.logger.org.spark-project.jetty=WARN
-log4j.logger.org.spark-project.jetty.util.component.AbstractLifeCycle=ERROR
-log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
-log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org