You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ni...@apache.org on 2020/12/16 06:12:27 UTC
[atlas] 01/03: ATLAS-4063: EmbeddedKafkaServer simplification
This is an automated email from the ASF dual-hosted git repository.
nixon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
commit 1907f6408c4c499a0cc7b4bf016571a4b752a7f8
Author: Andras Katona <ak...@cloudera.com>
AuthorDate: Mon Dec 7 09:25:42 2020 +0100
ATLAS-4063: EmbeddedKafkaServer simplification
Signed-off-by: nixonrodrigues <ni...@apache.org>
---
.../apache/atlas/kafka/EmbeddedKafkaServer.java | 37 ++--------------------
1 file changed, 2 insertions(+), 35 deletions(-)
diff --git a/notification/src/main/java/org/apache/atlas/kafka/EmbeddedKafkaServer.java b/notification/src/main/java/org/apache/atlas/kafka/EmbeddedKafkaServer.java
index 235b7ce..b793b9a 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/EmbeddedKafkaServer.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/EmbeddedKafkaServer.java
@@ -17,7 +17,6 @@
*/
package org.apache.atlas.kafka;
-import kafka.metrics.KafkaMetricsReporter;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import org.apache.atlas.ApplicationProperties;
@@ -35,7 +34,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import scala.Option;
-import scala.collection.mutable.Buffer;
+import scala.collection.mutable.ArrayBuffer;
import javax.inject.Inject;
import java.io.File;
@@ -45,7 +44,6 @@ import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.*;
-import java.util.concurrent.TimeUnit;
@Component
@@ -138,10 +136,7 @@ public class EmbeddedKafkaServer implements Service {
brokerConfig.setProperty("log.dirs", constructDir("kafka").getAbsolutePath());
brokerConfig.setProperty("log.flush.interval.messages", String.valueOf(1));
- List<KafkaMetricsReporter> metrics = new ArrayList<>();
- Buffer<KafkaMetricsReporter> metricsReporters = scala.collection.JavaConversions.asScalaBuffer(metrics);
-
- kafkaServer = new KafkaServer(KafkaConfig.fromProps(brokerConfig), new SystemTime(), Option.apply(this.getClass().getName()), metricsReporters);
+ kafkaServer = new KafkaServer(KafkaConfig.fromProps(brokerConfig), Time.SYSTEM, Option.apply(this.getClass().getName()), new ArrayBuffer<>());
kafkaServer.startup();
@@ -165,32 +160,4 @@ public class EmbeddedKafkaServer implements Service {
return new URL("http://" + url);
}
}
-
-
- // ----- inner class : SystemTime ----------------------------------------
- private static class SystemTime implements Time {
- @Override
- public long milliseconds() {
- return System.currentTimeMillis();
- }
-
- @Override
- public long nanoseconds() {
- return System.nanoTime();
- }
-
- @Override
- public long hiResClockMs() {
- return TimeUnit.NANOSECONDS.toMillis(nanoseconds());
- }
-
- @Override
- public void sleep(long arg0) {
- try {
- Thread.sleep(arg0);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
- }
}