You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2017/04/26 23:55:35 UTC

samza git commit: SAMZA-1026: HDFS System Producer should not have Kafka dependency

Repository: samza
Updated Branches:
  refs/heads/master 35a5cd9ad -> d72e47d4d


SAMZA-1026: HDFS System Producer should not have Kafka dependency

Author: Prateek Maheshwari <pm...@linkedin.com>

Reviewers: Navina Ramesh <nr...@linkedin.com>

Closes #144 from prateekm/hdfs-kafka-dependency


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/d72e47d4
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/d72e47d4
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/d72e47d4

Branch: refs/heads/master
Commit: d72e47d4d7a1ea5acc817a6d88c1e5a823f4bfed
Parents: 35a5cd9
Author: Prateek Maheshwari <pm...@linkedin.com>
Authored: Wed Apr 26 16:55:25 2017 -0700
Committer: nramesh <nr...@linkedin.com>
Committed: Wed Apr 26 16:55:25 2017 -0700

----------------------------------------------------------------------
 build.gradle                                      |  1 -
 .../org/apache/samza/system/hdfs/HdfsConfig.scala |  2 +-
 .../samza/system/hdfs/HdfsSystemFactory.scala     | 18 ++++++++++++++----
 .../samza/system/hdfs/HdfsSystemProducer.scala    | 10 ++++------
 4 files changed, 19 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/d72e47d4/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index b6abd47..dc56077 100644
--- a/build.gradle
+++ b/build.gradle
@@ -512,7 +512,6 @@ project(":samza-hdfs_$scalaVersion") {
   dependencies {
     compile project(':samza-api')
     compile project(":samza-core_$scalaVersion")
-    compile project(":samza-kafka_$scalaVersion")
     // currently hdfs system producer/consumer do depend on yarn for two things:
     // 1. staging directory 2. security
     // SAMZA-1032 to solve the staging directory dependency

http://git-wip-us.apache.org/repos/asf/samza/blob/d72e47d4/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala
index 12c93ae..52e19bf 100644
--- a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala
+++ b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala
@@ -83,7 +83,7 @@ object HdfsConfig {
   val STAGING_DIRECTORY = "systems.%s.stagingDirectory"
   val STAGING_DIRECTORY_DEFAULT = ""
 
-  implicit def Hdfs2Kafka(config: Config) = new HdfsConfig(config)
+  implicit def Config2Hdfs(config: Config) = new HdfsConfig(config)
 
 }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/d72e47d4/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala
index 3673431..05d717a 100644
--- a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala
+++ b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala
@@ -20,11 +20,11 @@
 package org.apache.samza.system.hdfs
 
 
-import org.apache.samza.config.Config
+import org.apache.samza.config.{Config, ConfigException, JobConfig}
 import org.apache.samza.metrics.MetricsRegistry
 import org.apache.samza.system.SystemFactory
 import org.apache.samza.system.hdfs.HdfsSystemConsumer.HdfsSystemConsumerMetrics
-import org.apache.samza.util.{KafkaUtil, Logging}
+import org.apache.samza.util.Logging
 
 
 class HdfsSystemFactory extends SystemFactory with Logging {
@@ -33,8 +33,11 @@ class HdfsSystemFactory extends SystemFactory with Logging {
   }
 
   def getProducer(systemName: String, config: Config, registry: MetricsRegistry) = {
-    // TODO: SAMZA-1026: should remove Kafka dependency below
-    val clientId = KafkaUtil.getClientId("samza-producer", config)
+    val jobConfig = new JobConfig(config)
+    val jobName = jobConfig.getName.getOrElse(throw new ConfigException("Missing job name."))
+    val jobId = jobConfig.getJobId.getOrElse("1")
+
+    val clientId = getClientId("samza-producer", jobName, jobId)
     val metrics = new HdfsSystemProducerMetrics(systemName, registry)
     new HdfsSystemProducer(systemName, clientId, config, metrics)
   }
@@ -42,4 +45,11 @@ class HdfsSystemFactory extends SystemFactory with Logging {
   def getAdmin(systemName: String, config: Config) = {
     new HdfsSystemAdmin(systemName, config)
   }
+
+  def getClientId(id: String, jobName: String, jobId: String): String = {
+    "%s-%s-%s" format
+      (id.replaceAll("[^A-Za-z0-9]", "_"),
+        jobName.replaceAll("[^A-Za-z0-9]", "_"),
+        jobId.replaceAll("[^A-Za-z0-9]", "_"))
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/d72e47d4/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemProducer.scala
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemProducer.scala b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemProducer.scala
index d2967ca..79bca5b 100644
--- a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemProducer.scala
+++ b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemProducer.scala
@@ -20,14 +20,12 @@
 package org.apache.samza.system.hdfs
 
 
-import org.apache.hadoop.fs.FileSystem
 import org.apache.hadoop.conf.Configuration
-import org.apache.samza.SamzaException
-import org.apache.samza.config.Config
-import org.apache.samza.system.hdfs.HdfsConfig._
-import org.apache.samza.system.{SystemProducer, OutgoingMessageEnvelope}
+import org.apache.hadoop.fs.FileSystem
 import org.apache.samza.system.hdfs.writer.HdfsWriter
-import org.apache.samza.util.{Logging, ExponentialSleepStrategy, TimerUtils, KafkaUtil}
+import org.apache.samza.system.{OutgoingMessageEnvelope, SystemProducer}
+import org.apache.samza.util.{Logging, TimerUtils}
+
 import scala.collection.mutable.{Map => MMap}