You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2015/06/02 20:20:39 UTC

spark git commit: [SPARK-8015] [FLUME] Remove Guava dependency from flume-sink.

Repository: spark
Updated Branches:
  refs/heads/master 1bb5d716c -> 0071bd8d3


[SPARK-8015] [FLUME] Remove Guava dependency from flume-sink.

The minimal change would be to disable shading of Guava in the module,
and rely on the transitive dependency from other libraries instead. But
since Guava's use is so localized, I think it's better to just not use
it instead, so I replaced that code and removed all traces of Guava from
the module's build.

Author: Marcelo Vanzin <va...@cloudera.com>

Closes #6555 from vanzin/SPARK-8015 and squashes the following commits:

c0ceea8 [Marcelo Vanzin] Add comments about dependency management.
c38228d [Marcelo Vanzin] Add guava dep in test scope.
b7a0349 [Marcelo Vanzin] Add libthrift exclusion.
6e0942d [Marcelo Vanzin] Add comment in pom.
2d79260 [Marcelo Vanzin] [SPARK-8015] [flume] Remove Guava dependency from flume-sink.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0071bd8d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0071bd8d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0071bd8d

Branch: refs/heads/master
Commit: 0071bd8d31f13abfe73b9d141a818412d374dce0
Parents: 1bb5d71
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Tue Jun 2 11:20:33 2015 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Tue Jun 2 11:20:33 2015 -0700

----------------------------------------------------------------------
 external/flume-sink/pom.xml                     | 39 ++++++++++++++++++++
 .../flume/sink/SparkAvroCallbackHandler.scala   |  4 +-
 .../flume/sink/SparkSinkThreadFactory.scala     | 35 ++++++++++++++++++
 .../streaming/flume/sink/SparkSinkSuite.scala   |  6 +--
 4 files changed, 77 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0071bd8d/external/flume-sink/pom.xml
----------------------------------------------------------------------
diff --git a/external/flume-sink/pom.xml b/external/flume-sink/pom.xml
index 1f3e619..71f2b6f 100644
--- a/external/flume-sink/pom.xml
+++ b/external/flume-sink/pom.xml
@@ -42,16 +42,47 @@
     <dependency>
       <groupId>org.apache.flume</groupId>
       <artifactId>flume-ng-sdk</artifactId>
+      <exclusions>
+        <!-- Guava is excluded to avoid its use in this module. -->
+        <exclusion>
+          <groupId>com.google.guava</groupId>
+          <artifactId>guava</artifactId>
+        </exclusion>
+        <!--
+          Exclude libthrift since the flume poms seem to confuse sbt, which fails to find the
+          dependency.
+        -->
+        <exclusion>
+          <groupId>org.apache.thrift</groupId>
+          <artifactId>libthrift</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.flume</groupId>
       <artifactId>flume-ng-core</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>com.google.guava</groupId>
+          <artifactId>guava</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.thrift</groupId>
+          <artifactId>libthrift</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.scala-lang</groupId>
       <artifactId>scala-library</artifactId>
     </dependency>
     <dependency>
+      <!-- Add Guava in test scope since flume actually needs it. -->
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <!--
         Netty explicitly added in test as it has been excluded from
         Flume dependency (to avoid runtime problems when running with
@@ -85,6 +116,14 @@
           </execution>
         </executions>
       </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-shade-plugin</artifactId>
+        <configuration>
+          <!-- Disable all relocations defined in the parent pom. -->
+          <relocations combine.self="override" />
+        </configuration>
+      </plugin>
     </plugins>
   </build>
 </project>

http://git-wip-us.apache.org/repos/asf/spark/blob/0071bd8d/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
----------------------------------------------------------------------
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
index fd01807..dc2a4ab 100644
--- a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
+++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
@@ -21,7 +21,6 @@ import java.util.concurrent.atomic.AtomicLong
 
 import scala.collection.mutable
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder
 import org.apache.flume.Channel
 import org.apache.commons.lang3.RandomStringUtils
 
@@ -45,8 +44,7 @@ import org.apache.commons.lang3.RandomStringUtils
 private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Channel,
   val transactionTimeout: Int, val backOffInterval: Int) extends SparkFlumeProtocol with Logging {
   val transactionExecutorOpt = Option(Executors.newFixedThreadPool(threads,
-    new ThreadFactoryBuilder().setDaemon(true)
-      .setNameFormat("Spark Sink Processor Thread - %d").build()))
+    new SparkSinkThreadFactory("Spark Sink Processor Thread - %d")))
   // Protected by `sequenceNumberToProcessor`
   private val sequenceNumberToProcessor = mutable.HashMap[CharSequence, TransactionProcessor]()
   // This sink will not persist sequence numbers and reuses them if it gets restarted.

http://git-wip-us.apache.org/repos/asf/spark/blob/0071bd8d/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkThreadFactory.scala
----------------------------------------------------------------------
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkThreadFactory.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkThreadFactory.scala
new file mode 100644
index 0000000..845fc8d
--- /dev/null
+++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkThreadFactory.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.flume.sink
+
+import java.util.concurrent.ThreadFactory
+import java.util.concurrent.atomic.AtomicLong
+
+/**
+ * Thread factory that generates daemon threads with a specified name format.
+ */
+private[sink] class SparkSinkThreadFactory(nameFormat: String) extends ThreadFactory {
+
+  private val threadId = new AtomicLong()
+
+  override def newThread(r: Runnable): Thread = {
+    val t = new Thread(r, nameFormat.format(threadId.incrementAndGet()))
+    t.setDaemon(true)
+    t
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0071bd8d/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala
----------------------------------------------------------------------
diff --git a/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala b/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala
index 605b3fe..fa43629 100644
--- a/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala
+++ b/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala
@@ -24,7 +24,6 @@ import scala.collection.JavaConversions._
 import scala.concurrent.{ExecutionContext, Future}
 import scala.util.{Failure, Success}
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder
 import org.apache.avro.ipc.NettyTransceiver
 import org.apache.avro.ipc.specific.SpecificRequestor
 import org.apache.flume.Context
@@ -194,9 +193,8 @@ class SparkSinkSuite extends FunSuite {
     count: Int): Seq[(NettyTransceiver, SparkFlumeProtocol.Callback)] = {
 
     (1 to count).map(_ => {
-      lazy val channelFactoryExecutor =
-        Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).
-          setNameFormat("Flume Receiver Channel Thread - %d").build())
+      lazy val channelFactoryExecutor = Executors.newCachedThreadPool(
+        new SparkSinkThreadFactory("Flume Receiver Channel Thread - %d"))
       lazy val channelFactory =
         new NioClientSocketChannelFactory(channelFactoryExecutor, channelFactoryExecutor)
       val transceiver = new NettyTransceiver(address, channelFactory)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org