You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@logging.apache.org by mi...@apache.org on 2016/08/21 08:42:06 UTC

[16/24] logging-log4j2 git commit: Use Log4jThread to name the thread.

Use Log4jThread to name the thread.

Project: http://git-wip-us.apache.org/repos/asf/logging-log4j2/repo
Commit: http://git-wip-us.apache.org/repos/asf/logging-log4j2/commit/f1b61bdd
Tree: http://git-wip-us.apache.org/repos/asf/logging-log4j2/tree/f1b61bdd
Diff: http://git-wip-us.apache.org/repos/asf/logging-log4j2/diff/f1b61bdd

Branch: refs/heads/LOG4J2-1528
Commit: f1b61bddf640ecc5d2c80ea190ff7332577cb787
Parents: 1aa3c3e
Author: Gary Gregory <gg...@apache.org>
Authored: Sat Aug 20 17:30:16 2016 -0700
Committer: Gary Gregory <gg...@apache.org>
Committed: Sat Aug 20 17:30:16 2016 -0700

----------------------------------------------------------------------
 .../core/appender/mom/kafka/KafkaManager.java   | 185 +++++++++----------
 1 file changed, 92 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/f1b61bdd/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java
index 4e4a09c..d535e02 100644
--- a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java
@@ -1,93 +1,92 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache license, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the license for the specific language governing permissions and
- * limitations under the license.
- */
-
-package org.apache.logging.log4j.core.appender.mom.kafka;
-
-import java.util.Properties;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.logging.log4j.core.appender.AbstractManager;
-import org.apache.logging.log4j.core.config.Property;
-import org.apache.logging.log4j.core.util.Log4jThread;
-
-public class KafkaManager extends AbstractManager {
-
-    public static final String DEFAULT_TIMEOUT_MILLIS = "30000";
-
-    /**
-     * package-private access for testing.
-     */
-    static KafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory();
-
-    private final Properties config = new Properties();
-    private Producer<byte[], byte[]> producer = null;
-    private final int timeoutMillis;
-
-    private final String topic;
-
-    public KafkaManager(final String name, final String topic, final Property[] properties) {
-        super(name);
-        this.topic = topic;
-        config.setProperty("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
-        config.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
-        config.setProperty("batch.size", "0");
-        for (final Property property : properties) {
-            config.setProperty(property.getName(), property.getValue());
-        }
-        this.timeoutMillis = Integer.parseInt(config.getProperty("timeout.ms", DEFAULT_TIMEOUT_MILLIS));
-    }
-
-    @Override
-    public void releaseSub() {
-        if (producer != null) {
-            // This thread is a workaround for this Kafka issue: https://issues.apache.org/jira/browse/KAFKA-1660
-            final Thread closeThread = new Log4jThread(new Runnable() {
-                @Override
-                public void run() {
-                    producer.close();
-                }
-            });
-            closeThread.setName("KafkaManager-CloseThread");
-            closeThread.setDaemon(true); // avoid blocking JVM shutdown
-            closeThread.start();
-            try {
-                closeThread.join(timeoutMillis);
-            } catch (final InterruptedException ignore) {
-                // ignore
-            }
-        }
-    }
-
-    public void send(final byte[] msg) throws ExecutionException, InterruptedException, TimeoutException {
-        if (producer != null) {
-            producer.send(new ProducerRecord<byte[], byte[]>(topic, msg)).get(timeoutMillis, TimeUnit.MILLISECONDS);
-        }
-    }
-
-    public void startup() {
-        producer = producerFactory.newKafkaProducer(config);
-    }
-
-    public String getTopic() {
-        return topic;
-    }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+
+package org.apache.logging.log4j.core.appender.mom.kafka;
+
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.logging.log4j.core.appender.AbstractManager;
+import org.apache.logging.log4j.core.config.Property;
+import org.apache.logging.log4j.core.util.Log4jThread;
+
+public class KafkaManager extends AbstractManager {
+
+    public static final String DEFAULT_TIMEOUT_MILLIS = "30000";
+
+    /**
+     * package-private access for testing.
+     */
+    static KafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory();
+
+    private final Properties config = new Properties();
+    private Producer<byte[], byte[]> producer = null;
+    private final int timeoutMillis;
+
+    private final String topic;
+
+    public KafkaManager(final String name, final String topic, final Property[] properties) {
+        super(name);
+        this.topic = topic;
+        config.setProperty("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
+        config.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
+        config.setProperty("batch.size", "0");
+        for (final Property property : properties) {
+            config.setProperty(property.getName(), property.getValue());
+        }
+        this.timeoutMillis = Integer.parseInt(config.getProperty("timeout.ms", DEFAULT_TIMEOUT_MILLIS));
+    }
+
+    @Override
+    public void releaseSub() {
+        if (producer != null) {
+            // This thread is a workaround for this Kafka issue: https://issues.apache.org/jira/browse/KAFKA-1660
+            final Thread closeThread = new Log4jThread(new Runnable() {
+                @Override
+                public void run() {
+                    producer.close();
+                }
+            }, "KafkaManager-CloseThread");
+            closeThread.setDaemon(true); // avoid blocking JVM shutdown
+            closeThread.start();
+            try {
+                closeThread.join(timeoutMillis);
+            } catch (final InterruptedException ignore) {
+                // ignore
+            }
+        }
+    }
+
+    public void send(final byte[] msg) throws ExecutionException, InterruptedException, TimeoutException {
+        if (producer != null) {
+            producer.send(new ProducerRecord<byte[], byte[]>(topic, msg)).get(timeoutMillis, TimeUnit.MILLISECONDS);
+        }
+    }
+
+    public void startup() {
+        producer = producerFactory.newKafkaProducer(config);
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+}