You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/03/05 00:16:01 UTC

incubator-gobblin git commit: [GOBBLIN-421] Add parameterized type for Pusher message type

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master ca5835b16 -> 8636b0cca


[GOBBLIN-421] Add parameterized type for Pusher message type

Closes #2298 from htran1/pusher_message_type


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/8636b0cc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/8636b0cc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/8636b0cc

Branch: refs/heads/master
Commit: 8636b0ccabb517c9783287a7a902e3881a878141
Parents: ca5835b
Author: Hung Tran <hu...@linkedin.com>
Authored: Sun Mar 4 16:15:50 2018 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Sun Mar 4 16:15:50 2018 -0800

----------------------------------------------------------------------
 .../metrics/kafka/KafkaProducerPusher.java      |  2 +-
 .../gobblin/metrics/kafka/KafkaPusher.java      |  2 +-
 .../metrics/kafka/KafkaProducerPusher.java      |  2 +-
 .../gobblin/metrics/kafka/LoggingPusher.java    | 68 ++++++++++++++++++++
 .../gobblin/metrics/kafka/NoopPusher.java       | 47 ++++++++++++++
 .../apache/gobblin/metrics/kafka/Pusher.java    |  4 +-
 .../metrics/kafka/LoggingPusherTest.java        | 64 ++++++++++++++++++
 .../metrics/reporter/MockKafkaPusher.java       |  2 +-
 8 files changed, 185 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8636b0cc/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java
index ff75a92..d83cc36 100644
--- a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java
+++ b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java
@@ -37,7 +37,7 @@ import org.apache.gobblin.util.ConfigUtils;
 /**
  * Establishes a connection to a Kafka cluster and push byte messages to a specified topic.
  */
-public class KafkaProducerPusher implements Pusher {
+public class KafkaProducerPusher implements Pusher<byte[]> {
 
   private final String topic;
   private final KafkaProducer<String, byte[]> producer;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8636b0cc/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaPusher.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaPusher.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaPusher.java
index 1c977ff..b32899c 100644
--- a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaPusher.java
+++ b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaPusher.java
@@ -33,7 +33,7 @@ import kafka.producer.ProducerConfig;
 /**
  * Establishes a connection to a Kafka cluster and pushed byte messages to a specified topic.
  */
-public class KafkaPusher implements Pusher {
+public class KafkaPusher implements Pusher<byte[]> {
 
   private final String topic;
   private final ProducerCloseable<String, byte[]> producer;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8636b0cc/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java
index 3d2de9b..52d416b 100644
--- a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java
+++ b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java
@@ -37,7 +37,7 @@ import org.apache.gobblin.util.ConfigUtils;
 /**
  * Establish a connection to a Kafka cluster and push byte messages to a specified topic.
  */
-public class KafkaProducerPusher implements Pusher {
+public class KafkaProducerPusher implements Pusher<byte[]> {
 
   private final String topic;
   private final KafkaProducer<String, byte[]> producer;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8636b0cc/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/LoggingPusher.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/LoggingPusher.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/LoggingPusher.java
new file mode 100644
index 0000000..b86287e
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/LoggingPusher.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.metrics.kafka;
+
+import java.io.IOException;
+import java.util.List;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * This is a {@Pusher} class that logs the messages
+ * @param <M> message type
+ */
+@Slf4j
+public class LoggingPusher<M> implements Pusher<M> {
+  private final String brokers;
+  private final String topic;
+  private static final String KAFKA_TOPIC = "kafka.topic";
+  private static final String NO_BROKERS = "NoBrokers";
+  private static final String NO_TOPIC = "NoTopic";
+
+  public LoggingPusher() {
+    this(NO_BROKERS, NO_TOPIC, Optional.absent());
+  }
+
+  public LoggingPusher(Config config) {
+    this.brokers = ConfigUtils.getString(config, ConfigurationKeys.KAFKA_BROKERS, NO_BROKERS);
+    this.topic = ConfigUtils.getString(config, KAFKA_TOPIC, NO_TOPIC);
+  }
+
+  /**
+   * Constructor like the one in KafkaProducerPusher for compatibility
+   */
+  public LoggingPusher(String brokers, String topic, Optional<Config> kafkaConfig) {
+    this.brokers = brokers;
+    this.topic = topic;
+  }
+
+  public void pushMessages(List<M> messages) {
+    for (M message: messages) {
+      log.info("Pushing to {}:{}: {}", this.brokers, this.topic, message.toString());
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8636b0cc/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/NoopPusher.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/NoopPusher.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/NoopPusher.java
new file mode 100644
index 0000000..2c1edd5
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/NoopPusher.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.metrics.kafka;
+
+import java.io.IOException;
+import java.util.List;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * This is a {@Pusher} class that ignores the messages
+ * @param <M> message type
+ */
+@Slf4j
+public class NoopPusher<M> implements Pusher<M> {
+  public NoopPusher() {}
+
+  public NoopPusher(Config config) {}
+
+  /**
+   * Constructor like the one in KafkaProducerPusher for compatibility
+   */
+  public NoopPusher(String brokers, String topic, Optional<Config> kafkaConfig) {}
+
+  public void pushMessages(List<M> messages) {}
+
+  @Override
+  public void close() throws IOException {}
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8636b0cc/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/Pusher.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/Pusher.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/Pusher.java
index 5abd503..9024a88 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/Pusher.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/Pusher.java
@@ -24,10 +24,10 @@ import java.util.List;
 /**
  * Establish a connection to a Kafka cluster and push byte messages to a specified topic.
  */
-public interface Pusher extends Closeable {
+public interface Pusher<M> extends Closeable {
   /**
    * Push all byte array messages to the Kafka topic.
    * @param messages List of byte array messages to push to Kakfa.
    */
-  void pushMessages(List<byte[]> messages);
+  void pushMessages(List<M> messages);
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8636b0cc/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/kafka/LoggingPusherTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/kafka/LoggingPusherTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/kafka/LoggingPusherTest.java
new file mode 100644
index 0000000..3e861de
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/kafka/LoggingPusherTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.metrics.kafka;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.log4j.AppenderSkeleton;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.log4j.spi.LoggingEvent;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+
+
+@Test
+public class LoggingPusherTest {
+
+  @Test
+  public void testKafkaReporter() {
+
+    TestAppender testAppender = new TestAppender();
+    Logger logger = LogManager.getLogger(LoggingPusher.class.getName());
+    logger.addAppender(testAppender);
+
+    LoggingPusher<String> loggingPusher = new LoggingPusher<String>("broker", "topic", Optional.absent());
+
+    loggingPusher.pushMessages(ImmutableList.of("message1", "message2"));
+
+    Assert.assertEquals(testAppender.events.size(), 2);
+    Assert.assertEquals(testAppender.events.get(0).getRenderedMessage(), "Pushing to broker:topic: message1");
+    Assert.assertEquals(testAppender.events.get(1).getRenderedMessage(), "Pushing to broker:topic: message2");
+
+    logger.removeAppender(testAppender);
+  }
+
+
+  private class TestAppender extends AppenderSkeleton {
+    List<LoggingEvent> events = new ArrayList<LoggingEvent>();
+    public void close() {}
+    public boolean requiresLayout() {return false;}
+    @Override
+    protected void append(LoggingEvent event) {
+      events.add(event);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8636b0cc/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/MockKafkaPusher.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/MockKafkaPusher.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/MockKafkaPusher.java
index 71decbb..8bb827f 100644
--- a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/MockKafkaPusher.java
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/MockKafkaPusher.java
@@ -31,7 +31,7 @@ import org.apache.gobblin.metrics.kafka.Pusher;
 /**
  * Mock instance of {@link org.apache.gobblin.metrics.kafka.Pusher} used for testing.
  */
-public class MockKafkaPusher implements Pusher {
+public class MockKafkaPusher implements Pusher<byte[]> {
 
   Queue<byte[]> messages = Queues.newLinkedBlockingQueue();