You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/05/19 20:47:59 UTC
samza git commit: SAMZA-950: Add backward-compatible
KafkaSystemProducer constructor for Java clients
Repository: samza
Updated Branches:
refs/heads/master 023a7ce23 -> 6ae7784a5
SAMZA-950: Add backward-compatible KafkaSystemProducer constructor for Java clients
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/6ae7784a
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/6ae7784a
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/6ae7784a
Branch: refs/heads/master
Commit: 6ae7784a579d6aa310eb0eefdc75c40e4a13d4d7
Parents: 023a7ce
Author: Jacob Maes <ja...@gmail.com>
Authored: Thu May 19 13:47:22 2016 -0700
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Thu May 19 13:47:22 2016 -0700
----------------------------------------------------------------------
.../system/kafka/KafkaSystemProducer.scala | 7 +++
.../kafka/TestKafkaSystemProducerJava.java | 58 ++++++++++++++++++++
2 files changed, 65 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/6ae7784a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
index d1a7a9f..3769e10 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
@@ -49,6 +49,13 @@ class KafkaSystemProducer(systemName: String,
var exceptionThrown: AtomicReference[Exception] = new AtomicReference[Exception]()
val StreamNameNullOrEmptyErrorMsg = "Stream Name should be specified in the stream configuration file.";
+ // Backward-compatible constructor for Java clients
+ def this(systemName: String,
+ retryBackoff: ExponentialSleepStrategy,
+ getProducer: () => Producer[Array[Byte], Array[Byte]],
+ metrics: KafkaSystemProducerMetrics,
+ clock: () => Long) = this(systemName, retryBackoff, getProducer, metrics, clock, 30)
+
def start() {
}
http://git-wip-us.apache.org/repos/asf/samza/blob/6ae7784a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemProducerJava.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemProducerJava.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemProducerJava.java
new file mode 100644
index 0000000..04c9113
--- /dev/null
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemProducerJava.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kafka;
+
+import java.util.HashMap;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.util.ExponentialSleepStrategy;
+import org.junit.Test;
+import scala.runtime.AbstractFunction0;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertTrue;
+
+
+/**
+ * Tests the instantiation of a KafkaSystemProducer from Java clients
+ */
+public class TestKafkaSystemProducerJava {
+ @Test
+ public void testInstantiateProducer() {
+ KafkaSystemProducer ksp = new KafkaSystemProducer("SysName", new ExponentialSleepStrategy(2.0, 200, 10000),
+ new AbstractFunction0<Producer<byte[], byte[]>>() {
+ @Override
+ public Producer<byte[], byte[]> apply() {
+ return new KafkaProducer<>(new HashMap<String, Object>());
+ }
+ }, new KafkaSystemProducerMetrics("SysName", new MetricsRegistryMap()), new AbstractFunction0<Object>() {
+ @Override
+ public Object apply() {
+ return System.currentTimeMillis();
+ }
+ });
+
+ // Default value should have been used.
+ assertEquals(30, ksp.maxRetries());
+ long now = System.currentTimeMillis();
+ assertTrue((Long)ksp.clock().apply() >= now);
+ }
+}
\ No newline at end of file