You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ni...@apache.org on 2020/12/16 06:12:27 UTC

[atlas] 01/03: ATLAS-4063: EmbeddedKafkaServer simplification

This is an automated email from the ASF dual-hosted git repository.

nixon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git

commit 1907f6408c4c499a0cc7b4bf016571a4b752a7f8
Author: Andras Katona <ak...@cloudera.com>
AuthorDate: Mon Dec 7 09:25:42 2020 +0100

    ATLAS-4063: EmbeddedKafkaServer simplification
    
    Signed-off-by: nixonrodrigues <ni...@apache.org>
---
 .../apache/atlas/kafka/EmbeddedKafkaServer.java    | 37 ++--------------------
 1 file changed, 2 insertions(+), 35 deletions(-)

diff --git a/notification/src/main/java/org/apache/atlas/kafka/EmbeddedKafkaServer.java b/notification/src/main/java/org/apache/atlas/kafka/EmbeddedKafkaServer.java
index 235b7ce..b793b9a 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/EmbeddedKafkaServer.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/EmbeddedKafkaServer.java
@@ -17,7 +17,6 @@
  */
 package org.apache.atlas.kafka;
 
-import kafka.metrics.KafkaMetricsReporter;
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaServer;
 import org.apache.atlas.ApplicationProperties;
@@ -35,7 +34,7 @@ import org.slf4j.LoggerFactory;
 import org.springframework.core.annotation.Order;
 import org.springframework.stereotype.Component;
 import scala.Option;
-import scala.collection.mutable.Buffer;
+import scala.collection.mutable.ArrayBuffer;
 
 import javax.inject.Inject;
 import java.io.File;
@@ -45,7 +44,6 @@ import java.net.MalformedURLException;
 import java.net.URISyntaxException;
 import java.net.URL;
 import java.util.*;
-import java.util.concurrent.TimeUnit;
 
 
 @Component
@@ -138,10 +136,7 @@ public class EmbeddedKafkaServer implements Service {
         brokerConfig.setProperty("log.dirs", constructDir("kafka").getAbsolutePath());
         brokerConfig.setProperty("log.flush.interval.messages", String.valueOf(1));
 
-        List<KafkaMetricsReporter>   metrics          = new ArrayList<>();
-        Buffer<KafkaMetricsReporter> metricsReporters = scala.collection.JavaConversions.asScalaBuffer(metrics);
-
-        kafkaServer = new KafkaServer(KafkaConfig.fromProps(brokerConfig), new SystemTime(), Option.apply(this.getClass().getName()), metricsReporters);
+        kafkaServer = new KafkaServer(KafkaConfig.fromProps(brokerConfig), Time.SYSTEM, Option.apply(this.getClass().getName()), new ArrayBuffer<>());
 
         kafkaServer.startup();
 
@@ -165,32 +160,4 @@ public class EmbeddedKafkaServer implements Service {
             return new URL("http://" + url);
         }
     }
-
-
-    // ----- inner class : SystemTime ----------------------------------------
-    private static class SystemTime implements Time {
-        @Override
-        public long milliseconds() {
-            return System.currentTimeMillis();
-        }
-
-        @Override
-        public long nanoseconds() {
-            return System.nanoTime();
-        }
-
-        @Override
-        public long hiResClockMs() {
-            return TimeUnit.NANOSECONDS.toMillis(nanoseconds());
-        }
-
-        @Override
-        public void sleep(long arg0) {
-            try {
-                Thread.sleep(arg0);
-            } catch (InterruptedException e) {
-                throw new RuntimeException(e);
-            }
-        }
-    }
 }