You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/01/08 22:02:49 UTC

spark git commit: [SPARK-4819] Remove Guava's "Optional" from public API

Repository: spark
Updated Branches:
  refs/heads/master 553fd7b91 -> 659fd9d04


[SPARK-4819] Remove Guava's "Optional" from public API

Replace Guava `Optional` with (an API clone of) Java 8 `java.util.Optional` (edit: and a clone of Guava `Optional`)

See also https://github.com/apache/spark/pull/10512

Author: Sean Owen <so...@cloudera.com>

Closes #10513 from srowen/SPARK-4819.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/659fd9d0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/659fd9d0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/659fd9d0

Branch: refs/heads/master
Commit: 659fd9d04b988d48960eac4f352ca37066f43f5c
Parents: 553fd7b
Author: Sean Owen <so...@cloudera.com>
Authored: Fri Jan 8 13:02:30 2016 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Fri Jan 8 13:02:30 2016 -0800

----------------------------------------------------------------------
 .../org/apache/spark/api/java/Optional.java     | 187 +++++++++++++++++++
 .../org/apache/spark/api/java/JavaPairRDD.scala |   2 -
 .../org/apache/spark/api/java/JavaRDDLike.scala |   4 -
 .../spark/api/java/JavaSparkContext.scala       |   1 -
 .../org/apache/spark/api/java/JavaUtils.scala   |   9 +-
 .../java/org/apache/spark/JavaAPISuite.java     |  46 ++---
 .../apache/spark/api/java/OptionalSuite.java    |  94 ++++++++++
 docs/streaming-programming-guide.md             |   1 -
 .../streaming/JavaStatefulNetworkWordCount.java |  20 +-
 .../java/org/apache/spark/Java8APISuite.java    |   2 +-
 .../apache/spark/streaming/Java8APISuite.java   |   1 -
 network/common/pom.xml                          |   6 -
 pom.xml                                         |  11 --
 project/MimaExcludes.scala                      |  11 +-
 .../org/apache/spark/streaming/StateSpec.scala  |  12 +-
 .../streaming/api/java/JavaPairDStream.scala    |   3 +-
 .../apache/spark/streaming/JavaAPISuite.java    |   2 +-
 .../spark/streaming/JavaMapWithStateSuite.java  |   4 +-
 .../tools/JavaAPICompletenessChecker.scala      |   2 +-
 19 files changed, 333 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/659fd9d0/core/src/main/java/org/apache/spark/api/java/Optional.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/Optional.java b/core/src/main/java/org/apache/spark/api/java/Optional.java
new file mode 100644
index 0000000..ca7babc
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/api/java/Optional.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java;
+
+import java.io.Serializable;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * <p>Like {@code java.util.Optional} in Java 8, {@code scala.Option} in Scala, and
+ * {@code com.google.common.base.Optional} in Google Guava, this class represents a
+ * value of a given type that may or may not exist. It is used in methods that wish
+ * to optionally return a value, in preference to returning {@code null}.</p>
+ *
+ * <p>In fact, the class here is a reimplementation of the essential API of both
+ * {@code java.util.Optional} and {@code com.google.common.base.Optional}. From
+ * {@code java.util.Optional}, it implements:</p>
+ *
+ * <ul>
+ *   <li>{@link #empty()}</li>
+ *   <li>{@link #of(Object)}</li>
+ *   <li>{@link #ofNullable(Object)}</li>
+ *   <li>{@link #get()}</li>
+ *   <li>{@link #orElse(Object)}</li>
+ *   <li>{@link #isPresent()}</li>
+ * </ul>
+ *
+ * <p>From {@code com.google.common.base.Optional} it implements:</p>
+ *
+ * <ul>
+ *   <li>{@link #absent()}</li>
+ *   <li>{@link #of(Object)}</li>
+ *   <li>{@link #fromNullable(Object)}</li>
+ *   <li>{@link #get()}</li>
+ *   <li>{@link #or(Object)}</li>
+ *   <li>{@link #orNull()}</li>
+ *   <li>{@link #isPresent()}</li>
+ * </ul>
+ *
+ * <p>{@code java.util.Optional} itself is not used at this time because the
+ * project does not require Java 8. Using {@code com.google.common.base.Optional}
+ * has in the past caused serious library version conflicts with Guava that can't
+ * be resolved by shading. Hence this work-alike clone.</p>
+ *
+ * @param <T> type of value held inside
+ */
+public final class Optional<T> implements Serializable {
+
+  private static final Optional<?> EMPTY = new Optional<>();
+
+  private final T value;
+
+  private Optional() {
+    this.value = null;
+  }
+
+  private Optional(T value) {
+    Preconditions.checkNotNull(value);
+    this.value = value;
+  }
+
+  // java.util.Optional API (subset)
+
+  /**
+   * @return an empty {@code Optional}
+   */
+  public static <T> Optional<T> empty() {
+    @SuppressWarnings("unchecked")
+    Optional<T> t = (Optional<T>) EMPTY;
+    return t;
+  }
+
+  /**
+   * @param value non-null value to wrap
+   * @return {@code Optional} wrapping this value
+   * @throws NullPointerException if value is null
+   */
+  public static <T> Optional<T> of(T value) {
+    return new Optional<>(value);
+  }
+
+  /**
+   * @param value value to wrap, which may be null
+   * @return {@code Optional} wrapping this value, which may be empty
+   */
+  public static <T> Optional<T> ofNullable(T value) {
+    if (value == null) {
+      return empty();
+    } else {
+      return of(value);
+    }
+  }
+
+  /**
+   * @return the value wrapped by this {@code Optional}
+   * @throws NullPointerException if this is empty (contains no value)
+   */
+  public T get() {
+    Preconditions.checkNotNull(value);
+    return value;
+  }
+
+  /**
+   * @param other value to return if this is empty
+   * @return this {@code Optional}'s value if present, or else the given value
+   */
+  public T orElse(T other) {
+    return value != null ? value : other;
+  }
+
+  /**
+   * @return true iff this {@code Optional} contains a value (non-empty)
+   */
+  public boolean isPresent() {
+    return value != null;
+  }
+
+  // Guava API (subset)
+  // of(), get() and isPresent() are identically present in the Guava API
+
+  /**
+   * @return an empty {@code Optional}
+   */
+  public static <T> Optional<T> absent() {
+    return empty();
+  }
+
+  /**
+   * @param value value to wrap, which may be null
+   * @return {@code Optional} wrapping this value, which may be empty
+   */
+  public static <T> Optional<T> fromNullable(T value) {
+    return ofNullable(value);
+  }
+
+  /**
+   * @param other value to return if this is empty
+   * @return this {@code Optional}'s value if present, or else the given value
+   */
+  public T or(T other) {
+    return value != null ? value : other;
+  }
+
+  /**
+   * @return this {@code Optional}'s value if present, or else null
+   */
+  public T orNull() {
+    return value;
+  }
+
+  // Common methods
+
+  @Override
+  public boolean equals(Object obj) {
+    if (!(obj instanceof Optional)) {
+      return false;
+    }
+    Optional<?> other = (Optional<?>) obj;
+    return value == null ? other.value == null : value.equals(other.value);
+  }
+
+  @Override
+  public int hashCode() {
+    return value == null ? 0 : value.hashCode();
+  }
+
+  @Override
+  public String toString() {
+    return value == null ? "Optional.empty" : String.format("Optional[%s]", value);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/659fd9d0/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index 59af105..fb04472 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -25,7 +25,6 @@ import scala.collection.JavaConverters._
 import scala.language.implicitConversions
 import scala.reflect.ClassTag
 
-import com.google.common.base.Optional
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.io.compress.CompressionCodec
 import org.apache.hadoop.mapred.{JobConf, OutputFormat}
@@ -655,7 +654,6 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
    * keys; this also retains the original RDD's partitioning.
    */
   def flatMapValues[U](f: JFunction[V, java.lang.Iterable[U]]): JavaPairRDD[K, U] = {
-    import scala.collection.JavaConverters._
     def fn: (V) => Iterable[U] = (x: V) => f.call(x).asScala
     implicit val ctag: ClassTag[U] = fakeClassTag
     fromRDD(rdd.flatMapValues(fn))

http://git-wip-us.apache.org/repos/asf/spark/blob/659fd9d0/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 2424382..0f8d13c 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -24,7 +24,6 @@ import java.util.{Comparator, Iterator => JIterator, List => JList}
 import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 
-import com.google.common.base.Optional
 import org.apache.hadoop.io.compress.CompressionCodec
 
 import org.apache.spark._
@@ -122,7 +121,6 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
    *  RDD, and then flattening the results.
    */
   def flatMap[U](f: FlatMapFunction[T, U]): JavaRDD[U] = {
-    import scala.collection.JavaConverters._
     def fn: (T) => Iterable[U] = (x: T) => f.call(x).asScala
     JavaRDD.fromRDD(rdd.flatMap(fn)(fakeClassTag[U]))(fakeClassTag[U])
   }
@@ -132,7 +130,6 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
    *  RDD, and then flattening the results.
    */
   def flatMapToDouble(f: DoubleFlatMapFunction[T]): JavaDoubleRDD = {
-    import scala.collection.JavaConverters._
     def fn: (T) => Iterable[jl.Double] = (x: T) => f.call(x).asScala
     new JavaDoubleRDD(rdd.flatMap(fn).map((x: jl.Double) => x.doubleValue()))
   }
@@ -142,7 +139,6 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
    *  RDD, and then flattening the results.
    */
   def flatMapToPair[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
-    import scala.collection.JavaConverters._
     def fn: (T) => Iterable[(K2, V2)] = (x: T) => f.call(x).asScala
     def cm: ClassTag[(K2, V2)] = implicitly[ClassTag[(K2, V2)]]
     JavaPairRDD.fromRDD(rdd.flatMap(fn)(cm))(fakeClassTag[K2], fakeClassTag[V2])

http://git-wip-us.apache.org/repos/asf/spark/blob/659fd9d0/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index 9990b22..01433ca 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -25,7 +25,6 @@ import scala.collection.JavaConverters._
 import scala.language.implicitConversions
 import scala.reflect.ClassTag
 
-import com.google.common.base.Optional
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapred.{InputFormat, JobConf}
 import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}

http://git-wip-us.apache.org/repos/asf/spark/blob/659fd9d0/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala b/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala
index b2a4d05..f820401 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala
@@ -22,13 +22,12 @@ import java.util.Map.Entry
 
 import scala.collection.mutable
 
-import com.google.common.base.Optional
-
 private[spark] object JavaUtils {
   def optionToOptional[T](option: Option[T]): Optional[T] =
-    option match {
-      case Some(value) => Optional.of(value)
-      case None => Optional.absent()
+    if (option.isDefined) {
+      Optional.of(option.get)
+    } else {
+      Optional.empty[T]
     }
 
   // Workaround for SPARK-3926 / SI-8911

http://git-wip-us.apache.org/repos/asf/spark/blob/659fd9d0/core/src/test/java/org/apache/spark/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index 47382e4..44d5cac 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -21,7 +21,17 @@ import java.io.*;
 import java.nio.channels.FileChannel;
 import java.nio.ByteBuffer;
 import java.net.URI;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.*;
 
 import scala.Tuple2;
@@ -35,7 +45,6 @@ import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.base.Throwables;
-import com.google.common.base.Optional;
 import com.google.common.base.Charsets;
 import com.google.common.io.Files;
 import org.apache.hadoop.io.IntWritable;
@@ -49,7 +58,12 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import org.apache.spark.api.java.*;
+import org.apache.spark.api.java.JavaDoubleRDD;
+import org.apache.spark.api.java.JavaFutureAction;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.Optional;
 import org.apache.spark.api.java.function.*;
 import org.apache.spark.input.PortableDataStream;
 import org.apache.spark.partial.BoundedDouble;
@@ -1785,32 +1799,6 @@ public class JavaAPISuite implements Serializable {
     Assert.assertTrue(future.isDone());
   }
 
-
-  /**
-   * Test for SPARK-3647. This test needs to use the maven-built assembly to trigger the issue,
-   * since that's the only artifact where Guava classes have been relocated.
-   */
-  @Test
-  public void testGuavaOptional() {
-    // Stop the context created in setUp() and start a local-cluster one, to force usage of the
-    // assembly.
-    sc.stop();
-    JavaSparkContext localCluster = new JavaSparkContext("local-cluster[1,1,1024]", "JavaAPISuite");
-    try {
-      JavaRDD<Integer> rdd1 = localCluster.parallelize(Arrays.asList(1, 2, null), 3);
-      JavaRDD<Optional<Integer>> rdd2 = rdd1.map(
-        new Function<Integer, Optional<Integer>>() {
-          @Override
-          public Optional<Integer> call(Integer i) {
-            return Optional.fromNullable(i);
-          }
-        });
-      rdd2.collect();
-    } finally {
-      localCluster.stop();
-    }
-  }
-
   static class Class1 {}
   static class Class2 {}
 

http://git-wip-us.apache.org/repos/asf/spark/blob/659fd9d0/core/src/test/java/org/apache/spark/api/java/OptionalSuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/api/java/OptionalSuite.java b/core/src/test/java/org/apache/spark/api/java/OptionalSuite.java
new file mode 100644
index 0000000..4b97c18
--- /dev/null
+++ b/core/src/test/java/org/apache/spark/api/java/OptionalSuite.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests {@link Optional}.
+ */
+public class OptionalSuite {
+
+  @Test
+  public void testEmpty() {
+    Assert.assertFalse(Optional.empty().isPresent());
+    Assert.assertNull(Optional.empty().orNull());
+    Assert.assertEquals("foo", Optional.empty().or("foo"));
+    Assert.assertEquals("foo", Optional.empty().orElse("foo"));
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testEmptyGet() {
+    Optional.empty().get();
+  }
+
+  @Test
+  public void testAbsent() {
+    Assert.assertFalse(Optional.absent().isPresent());
+    Assert.assertNull(Optional.absent().orNull());
+    Assert.assertEquals("foo", Optional.absent().or("foo"));
+    Assert.assertEquals("foo", Optional.absent().orElse("foo"));
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testAbsentGet() {
+    Optional.absent().get();
+  }
+
+  @Test
+  public void testOf() {
+    Assert.assertTrue(Optional.of(1).isPresent());
+    Assert.assertNotNull(Optional.of(1).orNull());
+    Assert.assertEquals(Integer.valueOf(1), Optional.of(1).get());
+    Assert.assertEquals(Integer.valueOf(1), Optional.of(1).or(2));
+    Assert.assertEquals(Integer.valueOf(1), Optional.of(1).orElse(2));
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testOfWithNull() {
+    Optional.of(null);
+  }
+
+  @Test
+  public void testOfNullable() {
+    Assert.assertTrue(Optional.ofNullable(1).isPresent());
+    Assert.assertNotNull(Optional.ofNullable(1).orNull());
+    Assert.assertEquals(Integer.valueOf(1), Optional.ofNullable(1).get());
+    Assert.assertEquals(Integer.valueOf(1), Optional.ofNullable(1).or(2));
+    Assert.assertEquals(Integer.valueOf(1), Optional.ofNullable(1).orElse(2));
+    Assert.assertFalse(Optional.ofNullable(null).isPresent());
+    Assert.assertNull(Optional.ofNullable(null).orNull());
+    Assert.assertEquals(Integer.valueOf(2), Optional.<Integer>ofNullable(null).or(2));
+    Assert.assertEquals(Integer.valueOf(2), Optional.<Integer>ofNullable(null).orElse(2));
+  }
+
+  @Test
+  public void testFromNullable() {
+    Assert.assertTrue(Optional.fromNullable(1).isPresent());
+    Assert.assertNotNull(Optional.fromNullable(1).orNull());
+    Assert.assertEquals(Integer.valueOf(1), Optional.fromNullable(1).get());
+    Assert.assertEquals(Integer.valueOf(1), Optional.fromNullable(1).or(2));
+    Assert.assertEquals(Integer.valueOf(1), Optional.fromNullable(1).orElse(2));
+    Assert.assertFalse(Optional.fromNullable(null).isPresent());
+    Assert.assertNull(Optional.fromNullable(null).orNull());
+    Assert.assertEquals(Integer.valueOf(2), Optional.<Integer>fromNullable(null).or(2));
+    Assert.assertEquals(Integer.valueOf(2), Optional.<Integer>fromNullable(null).orElse(2));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/659fd9d0/docs/streaming-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index 1edc0fe..8fd075d 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -881,7 +881,6 @@ Scala code, take a look at the example
 <div data-lang="java" markdown="1">
 
 {% highlight java %}
-import com.google.common.base.Optional;
 Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction =
   new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
     @Override public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {

http://git-wip-us.apache.org/repos/asf/spark/blob/659fd9d0/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
index 14997c6..f52cc7c 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
@@ -23,17 +23,14 @@ import java.util.regex.Pattern;
 
 import scala.Tuple2;
 
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
-
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.function.*;
 import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.Optional;
 import org.apache.spark.api.java.StorageLevels;
 import org.apache.spark.streaming.Durations;
 import org.apache.spark.streaming.State;
 import org.apache.spark.streaming.StateSpec;
-import org.apache.spark.streaming.Time;
 import org.apache.spark.streaming.api.java.*;
 
 /**
@@ -67,8 +64,8 @@ public class JavaStatefulNetworkWordCount {
 
     // Initial state RDD input to mapWithState
     @SuppressWarnings("unchecked")
-    List<Tuple2<String, Integer>> tuples = Arrays.asList(new Tuple2<String, Integer>("hello", 1),
-            new Tuple2<String, Integer>("world", 1));
+    List<Tuple2<String, Integer>> tuples =
+        Arrays.asList(new Tuple2<>("hello", 1), new Tuple2<>("world", 1));
     JavaPairRDD<String, Integer> initialRDD = ssc.sparkContext().parallelizePairs(tuples);
 
     JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
@@ -77,7 +74,7 @@ public class JavaStatefulNetworkWordCount {
     JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
       @Override
       public Iterable<String> call(String x) {
-        return Lists.newArrayList(SPACE.split(x));
+        return Arrays.asList(SPACE.split(x));
       }
     });
 
@@ -85,18 +82,17 @@ public class JavaStatefulNetworkWordCount {
         new PairFunction<String, String, Integer>() {
           @Override
           public Tuple2<String, Integer> call(String s) {
-            return new Tuple2<String, Integer>(s, 1);
+            return new Tuple2<>(s, 1);
           }
         });
 
     // Update the cumulative count function
-    final Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>> mappingFunc =
+    Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>> mappingFunc =
         new Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>>() {
-
           @Override
           public Tuple2<String, Integer> call(String word, Optional<Integer> one, State<Integer> state) {
-            int sum = one.or(0) + (state.exists() ? state.get() : 0);
-            Tuple2<String, Integer> output = new Tuple2<String, Integer>(word, sum);
+            int sum = one.orElse(0) + (state.exists() ? state.get() : 0);
+            Tuple2<String, Integer> output = new Tuple2<>(word, sum);
             state.update(sum);
             return output;
           }

http://git-wip-us.apache.org/repos/asf/spark/blob/659fd9d0/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java
----------------------------------------------------------------------
diff --git a/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java b/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java
index 1497526..27d494c 100644
--- a/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java
+++ b/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java
@@ -24,7 +24,6 @@ import java.util.*;
 import scala.Tuple2;
 
 import com.google.common.collect.Iterables;
-import com.google.common.base.Optional;
 import com.google.common.io.Files;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
@@ -38,6 +37,7 @@ import org.apache.spark.api.java.JavaDoubleRDD;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.Optional;
 import org.apache.spark.api.java.function.*;
 import org.apache.spark.util.Utils;
 

http://git-wip-us.apache.org/repos/asf/spark/blob/659fd9d0/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
----------------------------------------------------------------------
diff --git a/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java b/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
index e8a0dfc..604d818 100644
--- a/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
+++ b/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
@@ -22,7 +22,6 @@ import java.util.*;
 
 import scala.Tuple2;
 
-import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import org.junit.Assert;

http://git-wip-us.apache.org/repos/asf/spark/blob/659fd9d0/network/common/pom.xml
----------------------------------------------------------------------
diff --git a/network/common/pom.xml b/network/common/pom.xml
index 32c34c6..92ca004 100644
--- a/network/common/pom.xml
+++ b/network/common/pom.xml
@@ -52,15 +52,9 @@
       <groupId>com.google.code.findbugs</groupId>
       <artifactId>jsr305</artifactId>
     </dependency>
-    <!--
-      Promote Guava to "compile" so that maven-shade-plugin picks it up (for packaging the Optional
-      class exposed in the Java API). The plugin will then remove this dependency from the published
-      pom, so that Guava does not pollute the client's compilation classpath.
-    -->
     <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
-      <scope>compile</scope>
     </dependency>
 
     <!-- Test dependencies -->

http://git-wip-us.apache.org/repos/asf/spark/blob/659fd9d0/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index e414a8b..9c975a4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2251,17 +2251,6 @@
             <relocation>
               <pattern>com.google.common</pattern>
               <shadedPattern>org.spark-project.guava</shadedPattern>
-              <excludes>
-                <!--
-                  These classes cannot be relocated, because the Java API exposes the
-                  "Optional" type; the others are referenced by the Optional class.
-                -->
-                <exclude>com/google/common/base/Absent*</exclude>
-                <exclude>com/google/common/base/Function</exclude>
-                <exclude>com/google/common/base/Optional*</exclude>
-                <exclude>com/google/common/base/Present*</exclude>
-                <exclude>com/google/common/base/Supplier</exclude>
-              </excludes>
             </relocation>
           </relocations>
         </configuration>

http://git-wip-us.apache.org/repos/asf/spark/blob/659fd9d0/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 40559a0..0d5f938 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -57,7 +57,16 @@ object MimaExcludes {
       ) ++ Seq(
         ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.SparkContext.emptyRDD"),
         ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.broadcast.HttpBroadcastFactory")
-        ) ++
+      ) ++
+      Seq(
+        // SPARK-4819 replace Guava Optional
+        ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.api.java.JavaSparkContext.getCheckpointDir"),
+        ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.api.java.JavaSparkContext.getSparkHome"),
+        ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.api.java.JavaRDDLike.getCheckpointFile"),
+        ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.api.java.JavaRDDLike.partitioner"),
+        ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.api.java.JavaRDDLike.getCheckpointFile"),
+        ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.api.java.JavaRDDLike.partitioner")
+      ) ++
       Seq(
         // SPARK-12481 Remove Hadoop 1.x
         ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.spark.mapred.SparkHadoopMapRedUtil"),

http://git-wip-us.apache.org/repos/asf/spark/blob/659fd9d0/streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala b/streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala
index 0b09455..f1114c1 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala
@@ -17,11 +17,9 @@
 
 package org.apache.spark.streaming
 
-import com.google.common.base.Optional
-
 import org.apache.spark.{HashPartitioner, Partitioner}
 import org.apache.spark.annotation.Experimental
-import org.apache.spark.api.java.{JavaPairRDD, JavaUtils}
+import org.apache.spark.api.java.{JavaPairRDD, JavaUtils, Optional}
 import org.apache.spark.api.java.function.{Function3 => JFunction3, Function4 => JFunction4}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.util.ClosureCleaner
@@ -200,7 +198,11 @@ object StateSpec {
     StateSpec[KeyType, ValueType, StateType, MappedType] = {
     val wrappedFunc = (time: Time, k: KeyType, v: Option[ValueType], s: State[StateType]) => {
       val t = mappingFunction.call(time, k, JavaUtils.optionToOptional(v), s)
-      Option(t.orNull)
+      if (t.isPresent) {
+        Some(t.get)
+      } else {
+        None
+      }
     }
     StateSpec.function(wrappedFunc)
   }
@@ -220,7 +222,7 @@ object StateSpec {
       mappingFunction: JFunction3[KeyType, Optional[ValueType], State[StateType], MappedType]):
     StateSpec[KeyType, ValueType, StateType, MappedType] = {
     val wrappedFunc = (k: KeyType, v: Option[ValueType], s: State[StateType]) => {
-      mappingFunction.call(k, Optional.fromNullable(v.get), s)
+      mappingFunction.call(k, Optional.ofNullable(v.get), s)
     }
     StateSpec.function(wrappedFunc)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/659fd9d0/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index af0d84b..d718f1d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -25,14 +25,13 @@ import scala.collection.JavaConverters._
 import scala.language.implicitConversions
 import scala.reflect.ClassTag
 
-import com.google.common.base.Optional
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapred.{JobConf, OutputFormat}
 import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
 
 import org.apache.spark.Partitioner
 import org.apache.spark.annotation.Experimental
-import org.apache.spark.api.java.{JavaPairRDD, JavaSparkContext, JavaUtils}
+import org.apache.spark.api.java.{JavaPairRDD, JavaSparkContext, JavaUtils, Optional}
 import org.apache.spark.api.java.JavaPairRDD._
 import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
 import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}

http://git-wip-us.apache.org/repos/asf/spark/blob/659fd9d0/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index ddc56fc..4dbcef2 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -33,7 +33,6 @@ import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.junit.Assert;
 import org.junit.Test;
 
-import com.google.common.base.Optional;
 import com.google.common.io.Files;
 import com.google.common.collect.Sets;
 
@@ -43,6 +42,7 @@ import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.Optional;
 import org.apache.spark.api.java.function.*;
 import org.apache.spark.storage.StorageLevel;
 import org.apache.spark.streaming.api.java.*;

http://git-wip-us.apache.org/repos/asf/spark/blob/659fd9d0/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java
index 20e2a1c..9b77010 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java
@@ -26,7 +26,6 @@ import java.util.Set;
 
 import scala.Tuple2;
 
-import com.google.common.base.Optional;
 import com.google.common.collect.Sets;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.function.Function;
@@ -38,6 +37,7 @@ import org.junit.Test;
 
 import org.apache.spark.HashPartitioner;
 import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.Optional;
 import org.apache.spark.api.java.function.Function3;
 import org.apache.spark.api.java.function.Function4;
 import org.apache.spark.streaming.api.java.JavaPairDStream;
@@ -139,7 +139,7 @@ public class JavaMapWithStateSuite extends LocalJavaStreamingContext implements
         new Function3<String, Optional<Integer>, State<Integer>, Integer>() {
           @Override
           public Integer call(String key, Optional<Integer> value, State<Integer> state) {
-            int sum = value.or(0) + (state.exists() ? state.get() : 0);
+            int sum = value.orElse(0) + (state.exists() ? state.get() : 0);
             state.update(sum);
             return sum;
           }

http://git-wip-us.apache.org/repos/asf/spark/blob/659fd9d0/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala b/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala
index 6fb7184..ccd8fd3 100644
--- a/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala
+++ b/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala
@@ -161,7 +161,7 @@ object JavaAPICompletenessChecker {
               }
             case "scala.Option" => {
               if (isReturnType) {
-                ParameterizedType("com.google.common.base.Optional", parameters.map(applySubs))
+                ParameterizedType("org.apache.spark.api.java.Optional", parameters.map(applySubs))
               } else {
                 applySubs(parameters(0))
               }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org