You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2017/04/26 23:55:35 UTC
samza git commit: SAMZA-1026: HDFS System Producer should not have
Kafka dependency
Repository: samza
Updated Branches:
refs/heads/master 35a5cd9ad -> d72e47d4d
SAMZA-1026: HDFS System Producer should not have Kafka dependency
Author: Prateek Maheshwari <pm...@linkedin.com>
Reviewers: Navina Ramesh <nr...@linkedin.com>
Closes #144 from prateekm/hdfs-kafka-dependency
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/d72e47d4
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/d72e47d4
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/d72e47d4
Branch: refs/heads/master
Commit: d72e47d4d7a1ea5acc817a6d88c1e5a823f4bfed
Parents: 35a5cd9
Author: Prateek Maheshwari <pm...@linkedin.com>
Authored: Wed Apr 26 16:55:25 2017 -0700
Committer: nramesh <nr...@linkedin.com>
Committed: Wed Apr 26 16:55:25 2017 -0700
----------------------------------------------------------------------
build.gradle | 1 -
.../org/apache/samza/system/hdfs/HdfsConfig.scala | 2 +-
.../samza/system/hdfs/HdfsSystemFactory.scala | 18 ++++++++++++++----
.../samza/system/hdfs/HdfsSystemProducer.scala | 10 ++++------
4 files changed, 19 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/d72e47d4/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index b6abd47..dc56077 100644
--- a/build.gradle
+++ b/build.gradle
@@ -512,7 +512,6 @@ project(":samza-hdfs_$scalaVersion") {
dependencies {
compile project(':samza-api')
compile project(":samza-core_$scalaVersion")
- compile project(":samza-kafka_$scalaVersion")
// currently hdfs system producer/consumer do depend on yarn for two things:
// 1. staging directory 2. security
// SAMZA-1032 to solve the staging directory dependency
http://git-wip-us.apache.org/repos/asf/samza/blob/d72e47d4/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala
index 12c93ae..52e19bf 100644
--- a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala
+++ b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala
@@ -83,7 +83,7 @@ object HdfsConfig {
val STAGING_DIRECTORY = "systems.%s.stagingDirectory"
val STAGING_DIRECTORY_DEFAULT = ""
- implicit def Hdfs2Kafka(config: Config) = new HdfsConfig(config)
+ implicit def Config2Hdfs(config: Config) = new HdfsConfig(config)
}
http://git-wip-us.apache.org/repos/asf/samza/blob/d72e47d4/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala
index 3673431..05d717a 100644
--- a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala
+++ b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala
@@ -20,11 +20,11 @@
package org.apache.samza.system.hdfs
-import org.apache.samza.config.Config
+import org.apache.samza.config.{Config, ConfigException, JobConfig}
import org.apache.samza.metrics.MetricsRegistry
import org.apache.samza.system.SystemFactory
import org.apache.samza.system.hdfs.HdfsSystemConsumer.HdfsSystemConsumerMetrics
-import org.apache.samza.util.{KafkaUtil, Logging}
+import org.apache.samza.util.Logging
class HdfsSystemFactory extends SystemFactory with Logging {
@@ -33,8 +33,11 @@ class HdfsSystemFactory extends SystemFactory with Logging {
}
def getProducer(systemName: String, config: Config, registry: MetricsRegistry) = {
- // TODO: SAMZA-1026: should remove Kafka dependency below
- val clientId = KafkaUtil.getClientId("samza-producer", config)
+ val jobConfig = new JobConfig(config)
+ val jobName = jobConfig.getName.getOrElse(throw new ConfigException("Missing job name."))
+ val jobId = jobConfig.getJobId.getOrElse("1")
+
+ val clientId = getClientId("samza-producer", jobName, jobId)
val metrics = new HdfsSystemProducerMetrics(systemName, registry)
new HdfsSystemProducer(systemName, clientId, config, metrics)
}
@@ -42,4 +45,11 @@ class HdfsSystemFactory extends SystemFactory with Logging {
def getAdmin(systemName: String, config: Config) = {
new HdfsSystemAdmin(systemName, config)
}
+
+ def getClientId(id: String, jobName: String, jobId: String): String = {
+ "%s-%s-%s" format
+ (id.replaceAll("[^A-Za-z0-9]", "_"),
+ jobName.replaceAll("[^A-Za-z0-9]", "_"),
+ jobId.replaceAll("[^A-Za-z0-9]", "_"))
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/d72e47d4/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemProducer.scala
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemProducer.scala b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemProducer.scala
index d2967ca..79bca5b 100644
--- a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemProducer.scala
+++ b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemProducer.scala
@@ -20,14 +20,12 @@
package org.apache.samza.system.hdfs
-import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.conf.Configuration
-import org.apache.samza.SamzaException
-import org.apache.samza.config.Config
-import org.apache.samza.system.hdfs.HdfsConfig._
-import org.apache.samza.system.{SystemProducer, OutgoingMessageEnvelope}
+import org.apache.hadoop.fs.FileSystem
import org.apache.samza.system.hdfs.writer.HdfsWriter
-import org.apache.samza.util.{Logging, ExponentialSleepStrategy, TimerUtils, KafkaUtil}
+import org.apache.samza.system.{OutgoingMessageEnvelope, SystemProducer}
+import org.apache.samza.util.{Logging, TimerUtils}
+
import scala.collection.mutable.{Map => MMap}