You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2015/06/02 20:20:39 UTC
spark git commit: [SPARK-8015] [FLUME] Remove Guava dependency from
flume-sink.
Repository: spark
Updated Branches:
refs/heads/master 1bb5d716c -> 0071bd8d3
[SPARK-8015] [FLUME] Remove Guava dependency from flume-sink.
The minimal change would be to disable shading of Guava in the module,
and rely on the transitive dependency from other libraries instead. But
since Guava's use is so localized, I think it's better to just not use
it instead, so I replaced that code and removed all traces of Guava from
the module's build.
Author: Marcelo Vanzin <va...@cloudera.com>
Closes #6555 from vanzin/SPARK-8015 and squashes the following commits:
c0ceea8 [Marcelo Vanzin] Add comments about dependency management.
c38228d [Marcelo Vanzin] Add guava dep in test scope.
b7a0349 [Marcelo Vanzin] Add libthrift exclusion.
6e0942d [Marcelo Vanzin] Add comment in pom.
2d79260 [Marcelo Vanzin] [SPARK-8015] [flume] Remove Guava dependency from flume-sink.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0071bd8d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0071bd8d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0071bd8d
Branch: refs/heads/master
Commit: 0071bd8d31f13abfe73b9d141a818412d374dce0
Parents: 1bb5d71
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Tue Jun 2 11:20:33 2015 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Tue Jun 2 11:20:33 2015 -0700
----------------------------------------------------------------------
external/flume-sink/pom.xml | 39 ++++++++++++++++++++
.../flume/sink/SparkAvroCallbackHandler.scala | 4 +-
.../flume/sink/SparkSinkThreadFactory.scala | 35 ++++++++++++++++++
.../streaming/flume/sink/SparkSinkSuite.scala | 6 +--
4 files changed, 77 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/0071bd8d/external/flume-sink/pom.xml
----------------------------------------------------------------------
diff --git a/external/flume-sink/pom.xml b/external/flume-sink/pom.xml
index 1f3e619..71f2b6f 100644
--- a/external/flume-sink/pom.xml
+++ b/external/flume-sink/pom.xml
@@ -42,16 +42,47 @@
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-sdk</artifactId>
+ <exclusions>
+ <!-- Guava is excluded to avoid its use in this module. -->
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </exclusion>
+ <!--
+ Exclude libthrift since the flume poms seem to confuse sbt, which fails to find the
+ dependency.
+ -->
+ <exclusion>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libthrift</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libthrift</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
<dependency>
+ <!-- Add Guava in test scope since flume actually needs it. -->
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<!--
Netty explicitly added in test as it has been excluded from
Flume dependency (to avoid runtime problems when running with
@@ -85,6 +116,14 @@
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <configuration>
+ <!-- Disable all relocations defined in the parent pom. -->
+ <relocations combine.self="override" />
+ </configuration>
+ </plugin>
</plugins>
</build>
</project>
http://git-wip-us.apache.org/repos/asf/spark/blob/0071bd8d/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
----------------------------------------------------------------------
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
index fd01807..dc2a4ab 100644
--- a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
+++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
@@ -21,7 +21,6 @@ import java.util.concurrent.atomic.AtomicLong
import scala.collection.mutable
-import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.apache.flume.Channel
import org.apache.commons.lang3.RandomStringUtils
@@ -45,8 +44,7 @@ import org.apache.commons.lang3.RandomStringUtils
private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Channel,
val transactionTimeout: Int, val backOffInterval: Int) extends SparkFlumeProtocol with Logging {
val transactionExecutorOpt = Option(Executors.newFixedThreadPool(threads,
- new ThreadFactoryBuilder().setDaemon(true)
- .setNameFormat("Spark Sink Processor Thread - %d").build()))
+ new SparkSinkThreadFactory("Spark Sink Processor Thread - %d")))
// Protected by `sequenceNumberToProcessor`
private val sequenceNumberToProcessor = mutable.HashMap[CharSequence, TransactionProcessor]()
// This sink will not persist sequence numbers and reuses them if it gets restarted.
http://git-wip-us.apache.org/repos/asf/spark/blob/0071bd8d/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkThreadFactory.scala
----------------------------------------------------------------------
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkThreadFactory.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkThreadFactory.scala
new file mode 100644
index 0000000..845fc8d
--- /dev/null
+++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkThreadFactory.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.flume.sink
+
+import java.util.concurrent.ThreadFactory
+import java.util.concurrent.atomic.AtomicLong
+
+/**
+ * Thread factory that generates daemon threads with a specified name format.
+ */
+private[sink] class SparkSinkThreadFactory(nameFormat: String) extends ThreadFactory {
+
+ private val threadId = new AtomicLong()
+
+ override def newThread(r: Runnable): Thread = {
+ val t = new Thread(r, nameFormat.format(threadId.incrementAndGet()))
+ t.setDaemon(true)
+ t
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/0071bd8d/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala
----------------------------------------------------------------------
diff --git a/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala b/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala
index 605b3fe..fa43629 100644
--- a/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala
+++ b/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala
@@ -24,7 +24,6 @@ import scala.collection.JavaConversions._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
-import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.apache.avro.ipc.NettyTransceiver
import org.apache.avro.ipc.specific.SpecificRequestor
import org.apache.flume.Context
@@ -194,9 +193,8 @@ class SparkSinkSuite extends FunSuite {
count: Int): Seq[(NettyTransceiver, SparkFlumeProtocol.Callback)] = {
(1 to count).map(_ => {
- lazy val channelFactoryExecutor =
- Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).
- setNameFormat("Flume Receiver Channel Thread - %d").build())
+ lazy val channelFactoryExecutor = Executors.newCachedThreadPool(
+ new SparkSinkThreadFactory("Flume Receiver Channel Thread - %d"))
lazy val channelFactory =
new NioClientSocketChannelFactory(channelFactoryExecutor, channelFactoryExecutor)
val transceiver = new NettyTransceiver(address, channelFactory)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org