You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/05/19 20:47:59 UTC

samza git commit: SAMZA-950: Add backward-compatible KafkaSystemProducer constructor for Java clients

Repository: samza
Updated Branches:
  refs/heads/master 023a7ce23 -> 6ae7784a5


SAMZA-950: Add backward-compatible KafkaSystemProducer constructor for Java clients


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/6ae7784a
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/6ae7784a
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/6ae7784a

Branch: refs/heads/master
Commit: 6ae7784a579d6aa310eb0eefdc75c40e4a13d4d7
Parents: 023a7ce
Author: Jacob Maes <ja...@gmail.com>
Authored: Thu May 19 13:47:22 2016 -0700
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Thu May 19 13:47:22 2016 -0700

----------------------------------------------------------------------
 .../system/kafka/KafkaSystemProducer.scala      |  7 +++
 .../kafka/TestKafkaSystemProducerJava.java      | 58 ++++++++++++++++++++
 2 files changed, 65 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/6ae7784a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
index d1a7a9f..3769e10 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
@@ -49,6 +49,13 @@ class KafkaSystemProducer(systemName: String,
   var exceptionThrown: AtomicReference[Exception] = new AtomicReference[Exception]()
   val StreamNameNullOrEmptyErrorMsg = "Stream Name should be specified in the stream configuration file.";
 
+  // Backward-compatible constructor for Java clients
+  def this(systemName: String,
+           retryBackoff: ExponentialSleepStrategy,
+           getProducer: () => Producer[Array[Byte], Array[Byte]],
+           metrics: KafkaSystemProducerMetrics,
+           clock: () => Long) = this(systemName, retryBackoff, getProducer, metrics, clock, 30)
+
   def start() {
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/6ae7784a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemProducerJava.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemProducerJava.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemProducerJava.java
new file mode 100644
index 0000000..04c9113
--- /dev/null
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemProducerJava.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kafka;
+
+import java.util.HashMap;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.util.ExponentialSleepStrategy;
+import org.junit.Test;
+import scala.runtime.AbstractFunction0;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertTrue;
+
+
+/**
+ * Tests the instantiation of a KafkaSystemProducer from Java clients
+ */
+public class TestKafkaSystemProducerJava {
+  @Test
+  public void testInstantiateProducer() {
+    KafkaSystemProducer ksp = new KafkaSystemProducer("SysName", new ExponentialSleepStrategy(2.0, 200, 10000),
+        new AbstractFunction0<Producer<byte[], byte[]>>() {
+          @Override
+          public Producer<byte[], byte[]> apply() {
+            return new KafkaProducer<>(new HashMap<String, Object>());
+          }
+        }, new KafkaSystemProducerMetrics("SysName", new MetricsRegistryMap()), new AbstractFunction0<Object>() {
+      @Override
+      public Object apply() {
+        return System.currentTimeMillis();
+      }
+    });
+
+    // Default value should have been used.
+    assertEquals(30, ksp.maxRetries());
+    long now = System.currentTimeMillis();
+    assertTrue((Long)ksp.clock().apply() >= now);
+  }
+}
\ No newline at end of file