You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@logging.apache.org by gg...@apache.org on 2016/08/21 00:30:20 UTC

logging-log4j2 git commit: Use Log4jThread to name the thread.

Repository: logging-log4j2
Updated Branches:
  refs/heads/master 1aa3c3e24 -> f1b61bddf


Use Log4jThread to name the thread.

Project: http://git-wip-us.apache.org/repos/asf/logging-log4j2/repo
Commit: http://git-wip-us.apache.org/repos/asf/logging-log4j2/commit/f1b61bdd
Tree: http://git-wip-us.apache.org/repos/asf/logging-log4j2/tree/f1b61bdd
Diff: http://git-wip-us.apache.org/repos/asf/logging-log4j2/diff/f1b61bdd

Branch: refs/heads/master
Commit: f1b61bddf640ecc5d2c80ea190ff7332577cb787
Parents: 1aa3c3e
Author: Gary Gregory <gg...@apache.org>
Authored: Sat Aug 20 17:30:16 2016 -0700
Committer: Gary Gregory <gg...@apache.org>
Committed: Sat Aug 20 17:30:16 2016 -0700

----------------------------------------------------------------------
 .../core/appender/mom/kafka/KafkaManager.java   | 185 +++++++++----------
 1 file changed, 92 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/f1b61bdd/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java
index 4e4a09c..d535e02 100644
--- a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java
@@ -1,93 +1,92 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache license, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the license for the specific language governing permissions and
- * limitations under the license.
- */
-
-package org.apache.logging.log4j.core.appender.mom.kafka;
-
-import java.util.Properties;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.logging.log4j.core.appender.AbstractManager;
-import org.apache.logging.log4j.core.config.Property;
-import org.apache.logging.log4j.core.util.Log4jThread;
-
-public class KafkaManager extends AbstractManager {
-
-    public static final String DEFAULT_TIMEOUT_MILLIS = "30000";
-
-    /**
-     * package-private access for testing.
-     */
-    static KafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory();
-
-    private final Properties config = new Properties();
-    private Producer<byte[], byte[]> producer = null;
-    private final int timeoutMillis;
-
-    private final String topic;
-
-    public KafkaManager(final String name, final String topic, final Property[] properties) {
-        super(name);
-        this.topic = topic;
-        config.setProperty("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
-        config.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
-        config.setProperty("batch.size", "0");
-        for (final Property property : properties) {
-            config.setProperty(property.getName(), property.getValue());
-        }
-        this.timeoutMillis = Integer.parseInt(config.getProperty("timeout.ms", DEFAULT_TIMEOUT_MILLIS));
-    }
-
-    @Override
-    public void releaseSub() {
-        if (producer != null) {
-            // This thread is a workaround for this Kafka issue: https://issues.apache.org/jira/browse/KAFKA-1660
-            final Thread closeThread = new Log4jThread(new Runnable() {
-                @Override
-                public void run() {
-                    producer.close();
-                }
-            });
-            closeThread.setName("KafkaManager-CloseThread");
-            closeThread.setDaemon(true); // avoid blocking JVM shutdown
-            closeThread.start();
-            try {
-                closeThread.join(timeoutMillis);
-            } catch (final InterruptedException ignore) {
-                // ignore
-            }
-        }
-    }
-
-    public void send(final byte[] msg) throws ExecutionException, InterruptedException, TimeoutException {
-        if (producer != null) {
-            producer.send(new ProducerRecord<byte[], byte[]>(topic, msg)).get(timeoutMillis, TimeUnit.MILLISECONDS);
-        }
-    }
-
-    public void startup() {
-        producer = producerFactory.newKafkaProducer(config);
-    }
-
-    public String getTopic() {
-        return topic;
-    }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+
+package org.apache.logging.log4j.core.appender.mom.kafka;
+
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.logging.log4j.core.appender.AbstractManager;
+import org.apache.logging.log4j.core.config.Property;
+import org.apache.logging.log4j.core.util.Log4jThread;
+
+public class KafkaManager extends AbstractManager {
+
+    public static final String DEFAULT_TIMEOUT_MILLIS = "30000";
+
+    /**
+     * package-private access for testing.
+     */
+    static KafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory();
+
+    private final Properties config = new Properties();
+    private Producer<byte[], byte[]> producer = null;
+    private final int timeoutMillis;
+
+    private final String topic;
+
+    public KafkaManager(final String name, final String topic, final Property[] properties) {
+        super(name);
+        this.topic = topic;
+        config.setProperty("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
+        config.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
+        config.setProperty("batch.size", "0");
+        for (final Property property : properties) {
+            config.setProperty(property.getName(), property.getValue());
+        }
+        this.timeoutMillis = Integer.parseInt(config.getProperty("timeout.ms", DEFAULT_TIMEOUT_MILLIS));
+    }
+
+    @Override
+    public void releaseSub() {
+        if (producer != null) {
+            // This thread is a workaround for this Kafka issue: https://issues.apache.org/jira/browse/KAFKA-1660
+            final Thread closeThread = new Log4jThread(new Runnable() {
+                @Override
+                public void run() {
+                    producer.close();
+                }
+            }, "KafkaManager-CloseThread");
+            closeThread.setDaemon(true); // avoid blocking JVM shutdown
+            closeThread.start();
+            try {
+                closeThread.join(timeoutMillis);
+            } catch (final InterruptedException ignore) {
+                // ignore
+            }
+        }
+    }
+
+    public void send(final byte[] msg) throws ExecutionException, InterruptedException, TimeoutException {
+        if (producer != null) {
+            producer.send(new ProducerRecord<byte[], byte[]>(topic, msg)).get(timeoutMillis, TimeUnit.MILLISECONDS);
+        }
+    }
+
+    public void startup() {
+        producer = producerFactory.newKafkaProducer(config);
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+}


Re: logging-log4j2 git commit: Use Log4jThread to name the thread.

Posted by Gary Gregory <ga...@gmail.com>.
I thought Git did the conversion since I installed it that way (check out
as Windows EOL and commit as Unix EOL)... hm... but yes I use Eclipse...
and EGit which has "core autocrlf" set to true in my user settings. On the
command line "git config --global core.autocrlf" says true.

What's going on here?

Gary

On Sat, Aug 20, 2016 at 5:33 PM, Remko Popma <re...@gmail.com> wrote:

> Gary, can you please check your IDE settings for line endings?
> It is really difficult to see what changed if the whole file is marked as
> modified in the commit mail.
>
> I suspect this is because your IDE converts the unix line endings to
> windows CRLF or something.
>
> On Sun, Aug 21, 2016 at 9:30 AM, <gg...@apache.org> wrote:
>
>> Repository: logging-log4j2
>> Updated Branches:
>>   refs/heads/master 1aa3c3e24 -> f1b61bddf
>>
>>
>> Use Log4jThread to name the thread.
>>
>> Project: http://git-wip-us.apache.org/repos/asf/logging-log4j2/repo
>> Commit: http://git-wip-us.apache.org/repos/asf/logging-log4j2/commit
>> /f1b61bdd
>> Tree: http://git-wip-us.apache.org/repos/asf/logging-log4j2/tree/f1b61bdd
>> Diff: http://git-wip-us.apache.org/repos/asf/logging-log4j2/diff/f1b61bdd
>>
>> Branch: refs/heads/master
>> Commit: f1b61bddf640ecc5d2c80ea190ff7332577cb787
>> Parents: 1aa3c3e
>> Author: Gary Gregory <gg...@apache.org>
>> Authored: Sat Aug 20 17:30:16 2016 -0700
>> Committer: Gary Gregory <gg...@apache.org>
>> Committed: Sat Aug 20 17:30:16 2016 -0700
>>
>> ----------------------------------------------------------------------
>>  .../core/appender/mom/kafka/KafkaManager.java   | 185
>> +++++++++----------
>>  1 file changed, 92 insertions(+), 93 deletions(-)
>> ----------------------------------------------------------------------
>>
>>
>> http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/f
>> 1b61bdd/log4j-core/src/main/java/org/apache/logging/log4j/co
>> re/appender/mom/kafka/KafkaManager.java
>> ----------------------------------------------------------------------
>> diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/app
>> ender/mom/kafka/KafkaManager.java b/log4j-core/src/main/java/org
>> /apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java
>> index 4e4a09c..d535e02 100644
>> --- a/log4j-core/src/main/java/org/apache/logging/log4j/core/app
>> ender/mom/kafka/KafkaManager.java
>> +++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/app
>> ender/mom/kafka/KafkaManager.java
>> @@ -1,93 +1,92 @@
>> -/*
>> - * Licensed to the Apache Software Foundation (ASF) under one or more
>> - * contributor license agreements. See the NOTICE file distributed with
>> - * this work for additional information regarding copyright ownership.
>> - * The ASF licenses this file to You under the Apache license, Version
>> 2.0
>> - * (the "License"); you may not use this file except in compliance with
>> - * the License. You may obtain a copy of the License at
>> - *
>> - *      http://www.apache.org/licenses/LICENSE-2.0
>> - *
>> - * Unless required by applicable law or agreed to in writing, software
>> - * distributed under the License is distributed on an "AS IS" BASIS,
>> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
>> implied.
>> - * See the license for the specific language governing permissions and
>> - * limitations under the license.
>> - */
>> -
>> -package org.apache.logging.log4j.core.appender.mom.kafka;
>> -
>> -import java.util.Properties;
>> -import java.util.concurrent.ExecutionException;
>> -import java.util.concurrent.TimeUnit;
>> -import java.util.concurrent.TimeoutException;
>> -
>> -import org.apache.kafka.clients.producer.Producer;
>> -import org.apache.kafka.clients.producer.ProducerRecord;
>> -import org.apache.logging.log4j.core.appender.AbstractManager;
>> -import org.apache.logging.log4j.core.config.Property;
>> -import org.apache.logging.log4j.core.util.Log4jThread;
>> -
>> -public class KafkaManager extends AbstractManager {
>> -
>> -    public static final String DEFAULT_TIMEOUT_MILLIS = "30000";
>> -
>> -    /**
>> -     * package-private access for testing.
>> -     */
>> -    static KafkaProducerFactory producerFactory = new
>> DefaultKafkaProducerFactory();
>> -
>> -    private final Properties config = new Properties();
>> -    private Producer<byte[], byte[]> producer = null;
>> -    private final int timeoutMillis;
>> -
>> -    private final String topic;
>> -
>> -    public KafkaManager(final String name, final String topic, final
>> Property[] properties) {
>> -        super(name);
>> -        this.topic = topic;
>> -        config.setProperty("key.serializer",
>> "org.apache.kafka.common.serialization.ByteArraySerializer");
>> -        config.setProperty("value.serializer",
>> "org.apache.kafka.common.serialization.ByteArraySerializer");
>> -        config.setProperty("batch.size", "0");
>> -        for (final Property property : properties) {
>> -            config.setProperty(property.getName(), property.getValue());
>> -        }
>> -        this.timeoutMillis = Integer.parseInt(config.getProperty("
>> timeout.ms", DEFAULT_TIMEOUT_MILLIS));
>> -    }
>> -
>> -    @Override
>> -    public void releaseSub() {
>> -        if (producer != null) {
>> -            // This thread is a workaround for this Kafka issue:
>> https://issues.apache.org/jira/browse/KAFKA-1660
>> -            final Thread closeThread = new Log4jThread(new Runnable() {
>> -                @Override
>> -                public void run() {
>> -                    producer.close();
>> -                }
>> -            });
>> -            closeThread.setName("KafkaManager-CloseThread");
>> -            closeThread.setDaemon(true); // avoid blocking JVM shutdown
>> -            closeThread.start();
>> -            try {
>> -                closeThread.join(timeoutMillis);
>> -            } catch (final InterruptedException ignore) {
>> -                // ignore
>> -            }
>> -        }
>> -    }
>> -
>> -    public void send(final byte[] msg) throws ExecutionException,
>> InterruptedException, TimeoutException {
>> -        if (producer != null) {
>> -            producer.send(new ProducerRecord<byte[], byte[]>(topic,
>> msg)).get(timeoutMillis, TimeUnit.MILLISECONDS);
>> -        }
>> -    }
>> -
>> -    public void startup() {
>> -        producer = producerFactory.newKafkaProducer(config);
>> -    }
>> -
>> -    public String getTopic() {
>> -        return topic;
>> -    }
>> -
>> -}
>> +/*
>> + * Licensed to the Apache Software Foundation (ASF) under one or more
>> + * contributor license agreements. See the NOTICE file distributed with
>> + * this work for additional information regarding copyright ownership.
>> + * The ASF licenses this file to You under the Apache license, Version
>> 2.0
>> + * (the "License"); you may not use this file except in compliance with
>> + * the License. You may obtain a copy of the License at
>> + *
>> + *      http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing, software
>> + * distributed under the License is distributed on an "AS IS" BASIS,
>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
>> implied.
>> + * See the license for the specific language governing permissions and
>> + * limitations under the license.
>> + */
>> +
>> +package org.apache.logging.log4j.core.appender.mom.kafka;
>> +
>> +import java.util.Properties;
>> +import java.util.concurrent.ExecutionException;
>> +import java.util.concurrent.TimeUnit;
>> +import java.util.concurrent.TimeoutException;
>> +
>> +import org.apache.kafka.clients.producer.Producer;
>> +import org.apache.kafka.clients.producer.ProducerRecord;
>> +import org.apache.logging.log4j.core.appender.AbstractManager;
>> +import org.apache.logging.log4j.core.config.Property;
>> +import org.apache.logging.log4j.core.util.Log4jThread;
>> +
>> +public class KafkaManager extends AbstractManager {
>> +
>> +    public static final String DEFAULT_TIMEOUT_MILLIS = "30000";
>> +
>> +    /**
>> +     * package-private access for testing.
>> +     */
>> +    static KafkaProducerFactory producerFactory = new
>> DefaultKafkaProducerFactory();
>> +
>> +    private final Properties config = new Properties();
>> +    private Producer<byte[], byte[]> producer = null;
>> +    private final int timeoutMillis;
>> +
>> +    private final String topic;
>> +
>> +    public KafkaManager(final String name, final String topic, final
>> Property[] properties) {
>> +        super(name);
>> +        this.topic = topic;
>> +        config.setProperty("key.serializer",
>> "org.apache.kafka.common.serialization.ByteArraySerializer");
>> +        config.setProperty("value.serializer",
>> "org.apache.kafka.common.serialization.ByteArraySerializer");
>> +        config.setProperty("batch.size", "0");
>> +        for (final Property property : properties) {
>> +            config.setProperty(property.getName(), property.getValue());
>> +        }
>> +        this.timeoutMillis = Integer.parseInt(config.getProperty("
>> timeout.ms", DEFAULT_TIMEOUT_MILLIS));
>> +    }
>> +
>> +    @Override
>> +    public void releaseSub() {
>> +        if (producer != null) {
>> +            // This thread is a workaround for this Kafka issue:
>> https://issues.apache.org/jira/browse/KAFKA-1660
>> +            final Thread closeThread = new Log4jThread(new Runnable() {
>> +                @Override
>> +                public void run() {
>> +                    producer.close();
>> +                }
>> +            }, "KafkaManager-CloseThread");
>> +            closeThread.setDaemon(true); // avoid blocking JVM shutdown
>> +            closeThread.start();
>> +            try {
>> +                closeThread.join(timeoutMillis);
>> +            } catch (final InterruptedException ignore) {
>> +                // ignore
>> +            }
>> +        }
>> +    }
>> +
>> +    public void send(final byte[] msg) throws ExecutionException,
>> InterruptedException, TimeoutException {
>> +        if (producer != null) {
>> +            producer.send(new ProducerRecord<byte[], byte[]>(topic,
>> msg)).get(timeoutMillis, TimeUnit.MILLISECONDS);
>> +        }
>> +    }
>> +
>> +    public void startup() {
>> +        producer = producerFactory.newKafkaProducer(config);
>> +    }
>> +
>> +    public String getTopic() {
>> +        return topic;
>> +    }
>> +
>> +}
>>
>>
>


-- 
E-Mail: garydgregory@gmail.com | ggregory@apache.org
Java Persistence with Hibernate, Second Edition
<http://www.manning.com/bauer3/>
JUnit in Action, Second Edition <http://www.manning.com/tahchiev/>
Spring Batch in Action <http://www.manning.com/templier/>
Blog: http://garygregory.wordpress.com
Home: http://garygregory.com/
Tweet! http://twitter.com/GaryGregory

Re: logging-log4j2 git commit: Use Log4jThread to name the thread.

Posted by Gary Gregory <ga...@gmail.com>.
If you use the command line, call:

git config --global core.autocrlf true

I'm not sure if EGit picks that up automagically, but mine says Core ->
autocrlf -> true

Gary

On Sat, Aug 20, 2016 at 5:35 PM, Matt Sicker <bo...@gmail.com> wrote:

> Ideally, we should just do an all at once conversion to unix line endings
> or windows line endings to all branches. Then the line ending fixes won't
> come in anymore. We could add an EditorConfig file to enforce it
> automatically in IDEs even.
>
> On 20 August 2016 at 19:33, Remko Popma <re...@gmail.com> wrote:
>
>> Gary, can you please check your IDE settings for line endings?
>> It is really difficult to see what changed if the whole file is marked as
>> modified in the commit mail.
>>
>> I suspect this is because your IDE converts the unix line endings to
>> windows CRLF or something.
>>
>> On Sun, Aug 21, 2016 at 9:30 AM, <gg...@apache.org> wrote:
>>
>>> Repository: logging-log4j2
>>> Updated Branches:
>>>   refs/heads/master 1aa3c3e24 -> f1b61bddf
>>>
>>>
>>> Use Log4jThread to name the thread.
>>>
>>> Project: http://git-wip-us.apache.org/repos/asf/logging-log4j2/repo
>>> Commit: http://git-wip-us.apache.org/repos/asf/logging-log4j2/commit
>>> /f1b61bdd
>>> Tree: http://git-wip-us.apache.org/repos/asf/logging-log4j2/tree/f
>>> 1b61bdd
>>> Diff: http://git-wip-us.apache.org/repos/asf/logging-log4j2/diff/f
>>> 1b61bdd
>>>
>>> Branch: refs/heads/master
>>> Commit: f1b61bddf640ecc5d2c80ea190ff7332577cb787
>>> Parents: 1aa3c3e
>>> Author: Gary Gregory <gg...@apache.org>
>>> Authored: Sat Aug 20 17:30:16 2016 -0700
>>> Committer: Gary Gregory <gg...@apache.org>
>>> Committed: Sat Aug 20 17:30:16 2016 -0700
>>>
>>> ----------------------------------------------------------------------
>>>  .../core/appender/mom/kafka/KafkaManager.java   | 185
>>> +++++++++----------
>>>  1 file changed, 92 insertions(+), 93 deletions(-)
>>> ----------------------------------------------------------------------
>>>
>>>
>>> http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/f
>>> 1b61bdd/log4j-core/src/main/java/org/apache/logging/log4j/co
>>> re/appender/mom/kafka/KafkaManager.java
>>> ----------------------------------------------------------------------
>>> diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/app
>>> ender/mom/kafka/KafkaManager.java b/log4j-core/src/main/java/org
>>> /apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java
>>> index 4e4a09c..d535e02 100644
>>> --- a/log4j-core/src/main/java/org/apache/logging/log4j/core/app
>>> ender/mom/kafka/KafkaManager.java
>>> +++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/app
>>> ender/mom/kafka/KafkaManager.java
>>> @@ -1,93 +1,92 @@
>>> -/*
>>> - * Licensed to the Apache Software Foundation (ASF) under one or more
>>> - * contributor license agreements. See the NOTICE file distributed with
>>> - * this work for additional information regarding copyright ownership.
>>> - * The ASF licenses this file to You under the Apache license, Version
>>> 2.0
>>> - * (the "License"); you may not use this file except in compliance with
>>> - * the License. You may obtain a copy of the License at
>>> - *
>>> - *      http://www.apache.org/licenses/LICENSE-2.0
>>> - *
>>> - * Unless required by applicable law or agreed to in writing, software
>>> - * distributed under the License is distributed on an "AS IS" BASIS,
>>> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
>>> implied.
>>> - * See the license for the specific language governing permissions and
>>> - * limitations under the license.
>>> - */
>>> -
>>> -package org.apache.logging.log4j.core.appender.mom.kafka;
>>> -
>>> -import java.util.Properties;
>>> -import java.util.concurrent.ExecutionException;
>>> -import java.util.concurrent.TimeUnit;
>>> -import java.util.concurrent.TimeoutException;
>>> -
>>> -import org.apache.kafka.clients.producer.Producer;
>>> -import org.apache.kafka.clients.producer.ProducerRecord;
>>> -import org.apache.logging.log4j.core.appender.AbstractManager;
>>> -import org.apache.logging.log4j.core.config.Property;
>>> -import org.apache.logging.log4j.core.util.Log4jThread;
>>> -
>>> -public class KafkaManager extends AbstractManager {
>>> -
>>> -    public static final String DEFAULT_TIMEOUT_MILLIS = "30000";
>>> -
>>> -    /**
>>> -     * package-private access for testing.
>>> -     */
>>> -    static KafkaProducerFactory producerFactory = new
>>> DefaultKafkaProducerFactory();
>>> -
>>> -    private final Properties config = new Properties();
>>> -    private Producer<byte[], byte[]> producer = null;
>>> -    private final int timeoutMillis;
>>> -
>>> -    private final String topic;
>>> -
>>> -    public KafkaManager(final String name, final String topic, final
>>> Property[] properties) {
>>> -        super(name);
>>> -        this.topic = topic;
>>> -        config.setProperty("key.serializer",
>>> "org.apache.kafka.common.serialization.ByteArraySerializer");
>>> -        config.setProperty("value.serializer",
>>> "org.apache.kafka.common.serialization.ByteArraySerializer");
>>> -        config.setProperty("batch.size", "0");
>>> -        for (final Property property : properties) {
>>> -            config.setProperty(property.getName(),
>>> property.getValue());
>>> -        }
>>> -        this.timeoutMillis = Integer.parseInt(config.getProperty("
>>> timeout.ms", DEFAULT_TIMEOUT_MILLIS));
>>> -    }
>>> -
>>> -    @Override
>>> -    public void releaseSub() {
>>> -        if (producer != null) {
>>> -            // This thread is a workaround for this Kafka issue:
>>> https://issues.apache.org/jira/browse/KAFKA-1660
>>> -            final Thread closeThread = new Log4jThread(new Runnable() {
>>> -                @Override
>>> -                public void run() {
>>> -                    producer.close();
>>> -                }
>>> -            });
>>> -            closeThread.setName("KafkaManager-CloseThread");
>>> -            closeThread.setDaemon(true); // avoid blocking JVM shutdown
>>> -            closeThread.start();
>>> -            try {
>>> -                closeThread.join(timeoutMillis);
>>> -            } catch (final InterruptedException ignore) {
>>> -                // ignore
>>> -            }
>>> -        }
>>> -    }
>>> -
>>> -    public void send(final byte[] msg) throws ExecutionException,
>>> InterruptedException, TimeoutException {
>>> -        if (producer != null) {
>>> -            producer.send(new ProducerRecord<byte[], byte[]>(topic,
>>> msg)).get(timeoutMillis, TimeUnit.MILLISECONDS);
>>> -        }
>>> -    }
>>> -
>>> -    public void startup() {
>>> -        producer = producerFactory.newKafkaProducer(config);
>>> -    }
>>> -
>>> -    public String getTopic() {
>>> -        return topic;
>>> -    }
>>> -
>>> -}
>>> +/*
>>> + * Licensed to the Apache Software Foundation (ASF) under one or more
>>> + * contributor license agreements. See the NOTICE file distributed with
>>> + * this work for additional information regarding copyright ownership.
>>> + * The ASF licenses this file to You under the Apache license, Version
>>> 2.0
>>> + * (the "License"); you may not use this file except in compliance with
>>> + * the License. You may obtain a copy of the License at
>>> + *
>>> + *      http://www.apache.org/licenses/LICENSE-2.0
>>> + *
>>> + * Unless required by applicable law or agreed to in writing, software
>>> + * distributed under the License is distributed on an "AS IS" BASIS,
>>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
>>> implied.
>>> + * See the license for the specific language governing permissions and
>>> + * limitations under the license.
>>> + */
>>> +
>>> +package org.apache.logging.log4j.core.appender.mom.kafka;
>>> +
>>> +import java.util.Properties;
>>> +import java.util.concurrent.ExecutionException;
>>> +import java.util.concurrent.TimeUnit;
>>> +import java.util.concurrent.TimeoutException;
>>> +
>>> +import org.apache.kafka.clients.producer.Producer;
>>> +import org.apache.kafka.clients.producer.ProducerRecord;
>>> +import org.apache.logging.log4j.core.appender.AbstractManager;
>>> +import org.apache.logging.log4j.core.config.Property;
>>> +import org.apache.logging.log4j.core.util.Log4jThread;
>>> +
>>> +public class KafkaManager extends AbstractManager {
>>> +
>>> +    public static final String DEFAULT_TIMEOUT_MILLIS = "30000";
>>> +
>>> +    /**
>>> +     * package-private access for testing.
>>> +     */
>>> +    static KafkaProducerFactory producerFactory = new
>>> DefaultKafkaProducerFactory();
>>> +
>>> +    private final Properties config = new Properties();
>>> +    private Producer<byte[], byte[]> producer = null;
>>> +    private final int timeoutMillis;
>>> +
>>> +    private final String topic;
>>> +
>>> +    public KafkaManager(final String name, final String topic, final
>>> Property[] properties) {
>>> +        super(name);
>>> +        this.topic = topic;
>>> +        config.setProperty("key.serializer",
>>> "org.apache.kafka.common.serialization.ByteArraySerializer");
>>> +        config.setProperty("value.serializer",
>>> "org.apache.kafka.common.serialization.ByteArraySerializer");
>>> +        config.setProperty("batch.size", "0");
>>> +        for (final Property property : properties) {
>>> +            config.setProperty(property.getName(),
>>> property.getValue());
>>> +        }
>>> +        this.timeoutMillis = Integer.parseInt(config.getProperty("
>>> timeout.ms", DEFAULT_TIMEOUT_MILLIS));
>>> +    }
>>> +
>>> +    @Override
>>> +    public void releaseSub() {
>>> +        if (producer != null) {
>>> +            // This thread is a workaround for this Kafka issue:
>>> https://issues.apache.org/jira/browse/KAFKA-1660
>>> +            final Thread closeThread = new Log4jThread(new Runnable() {
>>> +                @Override
>>> +                public void run() {
>>> +                    producer.close();
>>> +                }
>>> +            }, "KafkaManager-CloseThread");
>>> +            closeThread.setDaemon(true); // avoid blocking JVM shutdown
>>> +            closeThread.start();
>>> +            try {
>>> +                closeThread.join(timeoutMillis);
>>> +            } catch (final InterruptedException ignore) {
>>> +                // ignore
>>> +            }
>>> +        }
>>> +    }
>>> +
>>> +    public void send(final byte[] msg) throws ExecutionException,
>>> InterruptedException, TimeoutException {
>>> +        if (producer != null) {
>>> +            producer.send(new ProducerRecord<byte[], byte[]>(topic,
>>> msg)).get(timeoutMillis, TimeUnit.MILLISECONDS);
>>> +        }
>>> +    }
>>> +
>>> +    public void startup() {
>>> +        producer = producerFactory.newKafkaProducer(config);
>>> +    }
>>> +
>>> +    public String getTopic() {
>>> +        return topic;
>>> +    }
>>> +
>>> +}
>>>
>>>
>>
>
>
> --
> Matt Sicker <bo...@gmail.com>
>



-- 
E-Mail: garydgregory@gmail.com | ggregory@apache.org
Java Persistence with Hibernate, Second Edition
<http://www.manning.com/bauer3/>
JUnit in Action, Second Edition <http://www.manning.com/tahchiev/>
Spring Batch in Action <http://www.manning.com/templier/>
Blog: http://garygregory.wordpress.com
Home: http://garygregory.com/
Tweet! http://twitter.com/GaryGregory

Re: logging-log4j2 git commit: Use Log4jThread to name the thread.

Posted by Matt Sicker <bo...@gmail.com>.
I use a similar setting in IntelliJ. It usually has a banner on the top of
the file when you open it to let you know if the file isn't formatted with
the same basic settings as the project (e.g., line endings, tabs versus
spaces, number of spaces to an indent).

On 20 August 2016 at 19:46, Remko Popma <re...@gmail.com> wrote:

> Looks like my IDE is set to use "system-dependent" line endings for new
> files.
> (I think that means that for existing files it will use the existing line
> endings.)
>
> Perhaps this is a formatter issue then? Gary, do you use Eclipse to format
> before committing? Perhaps the formatter has some setting to not change
> line endings?
>
> On Sun, Aug 21, 2016 at 9:35 AM, Matt Sicker <bo...@gmail.com> wrote:
>
>> Ideally, we should just do an all at once conversion to unix line endings
>> or windows line endings to all branches. Then the line ending fixes won't
>> come in anymore. We could add an EditorConfig file to enforce it
>> automatically in IDEs even.
>>
>> On 20 August 2016 at 19:33, Remko Popma <re...@gmail.com> wrote:
>>
>>> Gary, can you please check your IDE settings for line endings?
>>> It is really difficult to see what changed if the whole file is marked
>>> as modified in the commit mail.
>>>
>>> I suspect this is because your IDE converts the unix line endings to
>>> windows CRLF or something.
>>>
>>> On Sun, Aug 21, 2016 at 9:30 AM, <gg...@apache.org> wrote:
>>>
>>>> Repository: logging-log4j2
>>>> Updated Branches:
>>>>   refs/heads/master 1aa3c3e24 -> f1b61bddf
>>>>
>>>>
>>>> Use Log4jThread to name the thread.
>>>>
>>>> Project: http://git-wip-us.apache.org/repos/asf/logging-log4j2/repo
>>>> Commit: http://git-wip-us.apache.org/repos/asf/logging-log4j2/commit
>>>> /f1b61bdd
>>>> Tree: http://git-wip-us.apache.org/repos/asf/logging-log4j2/tree/f
>>>> 1b61bdd
>>>> Diff: http://git-wip-us.apache.org/repos/asf/logging-log4j2/diff/f
>>>> 1b61bdd
>>>>
>>>> Branch: refs/heads/master
>>>> Commit: f1b61bddf640ecc5d2c80ea190ff7332577cb787
>>>> Parents: 1aa3c3e
>>>> Author: Gary Gregory <gg...@apache.org>
>>>> Authored: Sat Aug 20 17:30:16 2016 -0700
>>>> Committer: Gary Gregory <gg...@apache.org>
>>>> Committed: Sat Aug 20 17:30:16 2016 -0700
>>>>
>>>> ----------------------------------------------------------------------
>>>>  .../core/appender/mom/kafka/KafkaManager.java   | 185
>>>> +++++++++----------
>>>>  1 file changed, 92 insertions(+), 93 deletions(-)
>>>> ----------------------------------------------------------------------
>>>>
>>>>
>>>> http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/f
>>>> 1b61bdd/log4j-core/src/main/java/org/apache/logging/log4j/co
>>>> re/appender/mom/kafka/KafkaManager.java
>>>> ----------------------------------------------------------------------
>>>> diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/app
>>>> ender/mom/kafka/KafkaManager.java b/log4j-core/src/main/java/org
>>>> /apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java
>>>> index 4e4a09c..d535e02 100644
>>>> --- a/log4j-core/src/main/java/org/apache/logging/log4j/core/app
>>>> ender/mom/kafka/KafkaManager.java
>>>> +++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/app
>>>> ender/mom/kafka/KafkaManager.java
>>>> @@ -1,93 +1,92 @@
>>>> -/*
>>>> - * Licensed to the Apache Software Foundation (ASF) under one or more
>>>> - * contributor license agreements. See the NOTICE file distributed with
>>>> - * this work for additional information regarding copyright ownership.
>>>> - * The ASF licenses this file to You under the Apache license, Version
>>>> 2.0
>>>> - * (the "License"); you may not use this file except in compliance with
>>>> - * the License. You may obtain a copy of the License at
>>>> - *
>>>> - *      http://www.apache.org/licenses/LICENSE-2.0
>>>> - *
>>>> - * Unless required by applicable law or agreed to in writing, software
>>>> - * distributed under the License is distributed on an "AS IS" BASIS,
>>>> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
>>>> implied.
>>>> - * See the license for the specific language governing permissions and
>>>> - * limitations under the license.
>>>> - */
>>>> -
>>>> -package org.apache.logging.log4j.core.appender.mom.kafka;
>>>> -
>>>> -import java.util.Properties;
>>>> -import java.util.concurrent.ExecutionException;
>>>> -import java.util.concurrent.TimeUnit;
>>>> -import java.util.concurrent.TimeoutException;
>>>> -
>>>> -import org.apache.kafka.clients.producer.Producer;
>>>> -import org.apache.kafka.clients.producer.ProducerRecord;
>>>> -import org.apache.logging.log4j.core.appender.AbstractManager;
>>>> -import org.apache.logging.log4j.core.config.Property;
>>>> -import org.apache.logging.log4j.core.util.Log4jThread;
>>>> -
>>>> -public class KafkaManager extends AbstractManager {
>>>> -
>>>> -    public static final String DEFAULT_TIMEOUT_MILLIS = "30000";
>>>> -
>>>> -    /**
>>>> -     * package-private access for testing.
>>>> -     */
>>>> -    static KafkaProducerFactory producerFactory = new
>>>> DefaultKafkaProducerFactory();
>>>> -
>>>> -    private final Properties config = new Properties();
>>>> -    private Producer<byte[], byte[]> producer = null;
>>>> -    private final int timeoutMillis;
>>>> -
>>>> -    private final String topic;
>>>> -
>>>> -    public KafkaManager(final String name, final String topic, final
>>>> Property[] properties) {
>>>> -        super(name);
>>>> -        this.topic = topic;
>>>> -        config.setProperty("key.serializer",
>>>> "org.apache.kafka.common.serialization.ByteArraySerializer");
>>>> -        config.setProperty("value.serializer",
>>>> "org.apache.kafka.common.serialization.ByteArraySerializer");
>>>> -        config.setProperty("batch.size", "0");
>>>> -        for (final Property property : properties) {
>>>> -            config.setProperty(property.getName(),
>>>> property.getValue());
>>>> -        }
>>>> -        this.timeoutMillis = Integer.parseInt(config.getProperty("
>>>> timeout.ms", DEFAULT_TIMEOUT_MILLIS));
>>>> -    }
>>>> -
>>>> -    @Override
>>>> -    public void releaseSub() {
>>>> -        if (producer != null) {
>>>> -            // This thread is a workaround for this Kafka issue:
>>>> https://issues.apache.org/jira/browse/KAFKA-1660
>>>> -            final Thread closeThread = new Log4jThread(new Runnable() {
>>>> -                @Override
>>>> -                public void run() {
>>>> -                    producer.close();
>>>> -                }
>>>> -            });
>>>> -            closeThread.setName("KafkaManager-CloseThread");
>>>> -            closeThread.setDaemon(true); // avoid blocking JVM shutdown
>>>> -            closeThread.start();
>>>> -            try {
>>>> -                closeThread.join(timeoutMillis);
>>>> -            } catch (final InterruptedException ignore) {
>>>> -                // ignore
>>>> -            }
>>>> -        }
>>>> -    }
>>>> -
>>>> -    public void send(final byte[] msg) throws ExecutionException,
>>>> InterruptedException, TimeoutException {
>>>> -        if (producer != null) {
>>>> -            producer.send(new ProducerRecord<byte[], byte[]>(topic,
>>>> msg)).get(timeoutMillis, TimeUnit.MILLISECONDS);
>>>> -        }
>>>> -    }
>>>> -
>>>> -    public void startup() {
>>>> -        producer = producerFactory.newKafkaProducer(config);
>>>> -    }
>>>> -
>>>> -    public String getTopic() {
>>>> -        return topic;
>>>> -    }
>>>> -
>>>> -}
>>>> +/*
>>>> + * Licensed to the Apache Software Foundation (ASF) under one or more
>>>> + * contributor license agreements. See the NOTICE file distributed with
>>>> + * this work for additional information regarding copyright ownership.
>>>> + * The ASF licenses this file to You under the Apache license, Version
>>>> 2.0
>>>> + * (the "License"); you may not use this file except in compliance with
>>>> + * the License. You may obtain a copy of the License at
>>>> + *
>>>> + *      http://www.apache.org/licenses/LICENSE-2.0
>>>> + *
>>>> + * Unless required by applicable law or agreed to in writing, software
>>>> + * distributed under the License is distributed on an "AS IS" BASIS,
>>>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
>>>> implied.
>>>> + * See the license for the specific language governing permissions and
>>>> + * limitations under the license.
>>>> + */
>>>> +
>>>> +package org.apache.logging.log4j.core.appender.mom.kafka;
>>>> +
>>>> +import java.util.Properties;
>>>> +import java.util.concurrent.ExecutionException;
>>>> +import java.util.concurrent.TimeUnit;
>>>> +import java.util.concurrent.TimeoutException;
>>>> +
>>>> +import org.apache.kafka.clients.producer.Producer;
>>>> +import org.apache.kafka.clients.producer.ProducerRecord;
>>>> +import org.apache.logging.log4j.core.appender.AbstractManager;
>>>> +import org.apache.logging.log4j.core.config.Property;
>>>> +import org.apache.logging.log4j.core.util.Log4jThread;
>>>> +
>>>> +public class KafkaManager extends AbstractManager {
>>>> +
>>>> +    public static final String DEFAULT_TIMEOUT_MILLIS = "30000";
>>>> +
>>>> +    /**
>>>> +     * package-private access for testing.
>>>> +     */
>>>> +    static KafkaProducerFactory producerFactory = new
>>>> DefaultKafkaProducerFactory();
>>>> +
>>>> +    private final Properties config = new Properties();
>>>> +    private Producer<byte[], byte[]> producer = null;
>>>> +    private final int timeoutMillis;
>>>> +
>>>> +    private final String topic;
>>>> +
>>>> +    public KafkaManager(final String name, final String topic, final
>>>> Property[] properties) {
>>>> +        super(name);
>>>> +        this.topic = topic;
>>>> +        config.setProperty("key.serializer",
>>>> "org.apache.kafka.common.serialization.ByteArraySerializer");
>>>> +        config.setProperty("value.serializer",
>>>> "org.apache.kafka.common.serialization.ByteArraySerializer");
>>>> +        config.setProperty("batch.size", "0");
>>>> +        for (final Property property : properties) {
>>>> +            config.setProperty(property.getName(),
>>>> property.getValue());
>>>> +        }
>>>> +        this.timeoutMillis = Integer.parseInt(config.getProperty("
>>>> timeout.ms", DEFAULT_TIMEOUT_MILLIS));
>>>> +    }
>>>> +
>>>> +    @Override
>>>> +    public void releaseSub() {
>>>> +        if (producer != null) {
>>>> +            // This thread is a workaround for this Kafka issue:
>>>> https://issues.apache.org/jira/browse/KAFKA-1660
>>>> +            final Thread closeThread = new Log4jThread(new Runnable() {
>>>> +                @Override
>>>> +                public void run() {
>>>> +                    producer.close();
>>>> +                }
>>>> +            }, "KafkaManager-CloseThread");
>>>> +            closeThread.setDaemon(true); // avoid blocking JVM shutdown
>>>> +            closeThread.start();
>>>> +            try {
>>>> +                closeThread.join(timeoutMillis);
>>>> +            } catch (final InterruptedException ignore) {
>>>> +                // ignore
>>>> +            }
>>>> +        }
>>>> +    }
>>>> +
>>>> +    public void send(final byte[] msg) throws ExecutionException,
>>>> InterruptedException, TimeoutException {
>>>> +        if (producer != null) {
>>>> +            producer.send(new ProducerRecord<byte[], byte[]>(topic,
>>>> msg)).get(timeoutMillis, TimeUnit.MILLISECONDS);
>>>> +        }
>>>> +    }
>>>> +
>>>> +    public void startup() {
>>>> +        producer = producerFactory.newKafkaProducer(config);
>>>> +    }
>>>> +
>>>> +    public String getTopic() {
>>>> +        return topic;
>>>> +    }
>>>> +
>>>> +}
>>>>
>>>>
>>>
>>
>>
>> --
>> Matt Sicker <bo...@gmail.com>
>>
>
>


-- 
Matt Sicker <bo...@gmail.com>

Re: logging-log4j2 git commit: Use Log4jThread to name the thread.

Posted by Remko Popma <re...@gmail.com>.
Looks like my IDE is set to use "system-dependent" line endings for new
files.
(I think that means that for existing files it will use the existing line
endings.)

Perhaps this is a formatter issue then? Gary, do you use Eclipse to format
before committing? Perhaps the formatter has some setting to not change
line endings?

On Sun, Aug 21, 2016 at 9:35 AM, Matt Sicker <bo...@gmail.com> wrote:

> Ideally, we should just do an all at once conversion to unix line endings
> or windows line endings to all branches. Then the line ending fixes won't
> come in anymore. We could add an EditorConfig file to enforce it
> automatically in IDEs even.
>
> On 20 August 2016 at 19:33, Remko Popma <re...@gmail.com> wrote:
>
>> Gary, can you please check your IDE settings for line endings?
>> It is really difficult to see what changed if the whole file is marked as
>> modified in the commit mail.
>>
>> I suspect this is because your IDE converts the unix line endings to
>> windows CRLF or something.
>>
>> On Sun, Aug 21, 2016 at 9:30 AM, <gg...@apache.org> wrote:
>>
>>> Repository: logging-log4j2
>>> Updated Branches:
>>>   refs/heads/master 1aa3c3e24 -> f1b61bddf
>>>
>>>
>>> Use Log4jThread to name the thread.
>>>
>>> Project: http://git-wip-us.apache.org/repos/asf/logging-log4j2/repo
>>> Commit: http://git-wip-us.apache.org/repos/asf/logging-log4j2/commit
>>> /f1b61bdd
>>> Tree: http://git-wip-us.apache.org/repos/asf/logging-log4j2/tree/f
>>> 1b61bdd
>>> Diff: http://git-wip-us.apache.org/repos/asf/logging-log4j2/diff/f
>>> 1b61bdd
>>>
>>> Branch: refs/heads/master
>>> Commit: f1b61bddf640ecc5d2c80ea190ff7332577cb787
>>> Parents: 1aa3c3e
>>> Author: Gary Gregory <gg...@apache.org>
>>> Authored: Sat Aug 20 17:30:16 2016 -0700
>>> Committer: Gary Gregory <gg...@apache.org>
>>> Committed: Sat Aug 20 17:30:16 2016 -0700
>>>
>>> ----------------------------------------------------------------------
>>>  .../core/appender/mom/kafka/KafkaManager.java   | 185
>>> +++++++++----------
>>>  1 file changed, 92 insertions(+), 93 deletions(-)
>>> ----------------------------------------------------------------------
>>>
>>>
>>> http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/f
>>> 1b61bdd/log4j-core/src/main/java/org/apache/logging/log4j/co
>>> re/appender/mom/kafka/KafkaManager.java
>>> ----------------------------------------------------------------------
>>> diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/app
>>> ender/mom/kafka/KafkaManager.java b/log4j-core/src/main/java/org
>>> /apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java
>>> index 4e4a09c..d535e02 100644
>>> --- a/log4j-core/src/main/java/org/apache/logging/log4j/core/app
>>> ender/mom/kafka/KafkaManager.java
>>> +++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/app
>>> ender/mom/kafka/KafkaManager.java
>>> @@ -1,93 +1,92 @@
>>> -/*
>>> - * Licensed to the Apache Software Foundation (ASF) under one or more
>>> - * contributor license agreements. See the NOTICE file distributed with
>>> - * this work for additional information regarding copyright ownership.
>>> - * The ASF licenses this file to You under the Apache license, Version
>>> 2.0
>>> - * (the "License"); you may not use this file except in compliance with
>>> - * the License. You may obtain a copy of the License at
>>> - *
>>> - *      http://www.apache.org/licenses/LICENSE-2.0
>>> - *
>>> - * Unless required by applicable law or agreed to in writing, software
>>> - * distributed under the License is distributed on an "AS IS" BASIS,
>>> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
>>> implied.
>>> - * See the license for the specific language governing permissions and
>>> - * limitations under the license.
>>> - */
>>> -
>>> -package org.apache.logging.log4j.core.appender.mom.kafka;
>>> -
>>> -import java.util.Properties;
>>> -import java.util.concurrent.ExecutionException;
>>> -import java.util.concurrent.TimeUnit;
>>> -import java.util.concurrent.TimeoutException;
>>> -
>>> -import org.apache.kafka.clients.producer.Producer;
>>> -import org.apache.kafka.clients.producer.ProducerRecord;
>>> -import org.apache.logging.log4j.core.appender.AbstractManager;
>>> -import org.apache.logging.log4j.core.config.Property;
>>> -import org.apache.logging.log4j.core.util.Log4jThread;
>>> -
>>> -public class KafkaManager extends AbstractManager {
>>> -
>>> -    public static final String DEFAULT_TIMEOUT_MILLIS = "30000";
>>> -
>>> -    /**
>>> -     * package-private access for testing.
>>> -     */
>>> -    static KafkaProducerFactory producerFactory = new
>>> DefaultKafkaProducerFactory();
>>> -
>>> -    private final Properties config = new Properties();
>>> -    private Producer<byte[], byte[]> producer = null;
>>> -    private final int timeoutMillis;
>>> -
>>> -    private final String topic;
>>> -
>>> -    public KafkaManager(final String name, final String topic, final
>>> Property[] properties) {
>>> -        super(name);
>>> -        this.topic = topic;
>>> -        config.setProperty("key.serializer",
>>> "org.apache.kafka.common.serialization.ByteArraySerializer");
>>> -        config.setProperty("value.serializer",
>>> "org.apache.kafka.common.serialization.ByteArraySerializer");
>>> -        config.setProperty("batch.size", "0");
>>> -        for (final Property property : properties) {
>>> -            config.setProperty(property.getName(),
>>> property.getValue());
>>> -        }
>>> -        this.timeoutMillis = Integer.parseInt(config.getProperty("
>>> timeout.ms", DEFAULT_TIMEOUT_MILLIS));
>>> -    }
>>> -
>>> -    @Override
>>> -    public void releaseSub() {
>>> -        if (producer != null) {
>>> -            // This thread is a workaround for this Kafka issue:
>>> https://issues.apache.org/jira/browse/KAFKA-1660
>>> -            final Thread closeThread = new Log4jThread(new Runnable() {
>>> -                @Override
>>> -                public void run() {
>>> -                    producer.close();
>>> -                }
>>> -            });
>>> -            closeThread.setName("KafkaManager-CloseThread");
>>> -            closeThread.setDaemon(true); // avoid blocking JVM shutdown
>>> -            closeThread.start();
>>> -            try {
>>> -                closeThread.join(timeoutMillis);
>>> -            } catch (final InterruptedException ignore) {
>>> -                // ignore
>>> -            }
>>> -        }
>>> -    }
>>> -
>>> -    public void send(final byte[] msg) throws ExecutionException,
>>> InterruptedException, TimeoutException {
>>> -        if (producer != null) {
>>> -            producer.send(new ProducerRecord<byte[], byte[]>(topic,
>>> msg)).get(timeoutMillis, TimeUnit.MILLISECONDS);
>>> -        }
>>> -    }
>>> -
>>> -    public void startup() {
>>> -        producer = producerFactory.newKafkaProducer(config);
>>> -    }
>>> -
>>> -    public String getTopic() {
>>> -        return topic;
>>> -    }
>>> -
>>> -}
>>> +/*
>>> + * Licensed to the Apache Software Foundation (ASF) under one or more
>>> + * contributor license agreements. See the NOTICE file distributed with
>>> + * this work for additional information regarding copyright ownership.
>>> + * The ASF licenses this file to You under the Apache license, Version
>>> 2.0
>>> + * (the "License"); you may not use this file except in compliance with
>>> + * the License. You may obtain a copy of the License at
>>> + *
>>> + *      http://www.apache.org/licenses/LICENSE-2.0
>>> + *
>>> + * Unless required by applicable law or agreed to in writing, software
>>> + * distributed under the License is distributed on an "AS IS" BASIS,
>>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
>>> implied.
>>> + * See the license for the specific language governing permissions and
>>> + * limitations under the license.
>>> + */
>>> +
>>> +package org.apache.logging.log4j.core.appender.mom.kafka;
>>> +
>>> +import java.util.Properties;
>>> +import java.util.concurrent.ExecutionException;
>>> +import java.util.concurrent.TimeUnit;
>>> +import java.util.concurrent.TimeoutException;
>>> +
>>> +import org.apache.kafka.clients.producer.Producer;
>>> +import org.apache.kafka.clients.producer.ProducerRecord;
>>> +import org.apache.logging.log4j.core.appender.AbstractManager;
>>> +import org.apache.logging.log4j.core.config.Property;
>>> +import org.apache.logging.log4j.core.util.Log4jThread;
>>> +
>>> +public class KafkaManager extends AbstractManager {
>>> +
>>> +    public static final String DEFAULT_TIMEOUT_MILLIS = "30000";
>>> +
>>> +    /**
>>> +     * package-private access for testing.
>>> +     */
>>> +    static KafkaProducerFactory producerFactory = new
>>> DefaultKafkaProducerFactory();
>>> +
>>> +    private final Properties config = new Properties();
>>> +    private Producer<byte[], byte[]> producer = null;
>>> +    private final int timeoutMillis;
>>> +
>>> +    private final String topic;
>>> +
>>> +    public KafkaManager(final String name, final String topic, final
>>> Property[] properties) {
>>> +        super(name);
>>> +        this.topic = topic;
>>> +        config.setProperty("key.serializer",
>>> "org.apache.kafka.common.serialization.ByteArraySerializer");
>>> +        config.setProperty("value.serializer",
>>> "org.apache.kafka.common.serialization.ByteArraySerializer");
>>> +        config.setProperty("batch.size", "0");
>>> +        for (final Property property : properties) {
>>> +            config.setProperty(property.getName(),
>>> property.getValue());
>>> +        }
>>> +        this.timeoutMillis = Integer.parseInt(config.getProperty("
>>> timeout.ms", DEFAULT_TIMEOUT_MILLIS));
>>> +    }
>>> +
>>> +    @Override
>>> +    public void releaseSub() {
>>> +        if (producer != null) {
>>> +            // This thread is a workaround for this Kafka issue:
>>> https://issues.apache.org/jira/browse/KAFKA-1660
>>> +            final Thread closeThread = new Log4jThread(new Runnable() {
>>> +                @Override
>>> +                public void run() {
>>> +                    producer.close();
>>> +                }
>>> +            }, "KafkaManager-CloseThread");
>>> +            closeThread.setDaemon(true); // avoid blocking JVM shutdown
>>> +            closeThread.start();
>>> +            try {
>>> +                closeThread.join(timeoutMillis);
>>> +            } catch (final InterruptedException ignore) {
>>> +                // ignore
>>> +            }
>>> +        }
>>> +    }
>>> +
>>> +    public void send(final byte[] msg) throws ExecutionException,
>>> InterruptedException, TimeoutException {
>>> +        if (producer != null) {
>>> +            producer.send(new ProducerRecord<byte[], byte[]>(topic,
>>> msg)).get(timeoutMillis, TimeUnit.MILLISECONDS);
>>> +        }
>>> +    }
>>> +
>>> +    public void startup() {
>>> +        producer = producerFactory.newKafkaProducer(config);
>>> +    }
>>> +
>>> +    public String getTopic() {
>>> +        return topic;
>>> +    }
>>> +
>>> +}
>>>
>>>
>>
>
>
> --
> Matt Sicker <bo...@gmail.com>
>

Re: logging-log4j2 git commit: Use Log4jThread to name the thread.

Posted by Matt Sicker <bo...@gmail.com>.
Ideally, we should just do an all at once conversion to unix line endings
or windows line endings to all branches. Then the line ending fixes won't
come in anymore. We could add an EditorConfig file to enforce it
automatically in IDEs even.

On 20 August 2016 at 19:33, Remko Popma <re...@gmail.com> wrote:

> Gary, can you please check your IDE settings for line endings?
> It is really difficult to see what changed if the whole file is marked as
> modified in the commit mail.
>
> I suspect this is because your IDE converts the unix line endings to
> windows CRLF or something.
>
> On Sun, Aug 21, 2016 at 9:30 AM, <gg...@apache.org> wrote:
>
>> Repository: logging-log4j2
>> Updated Branches:
>>   refs/heads/master 1aa3c3e24 -> f1b61bddf
>>
>>
>> Use Log4jThread to name the thread.
>>
>> Project: http://git-wip-us.apache.org/repos/asf/logging-log4j2/repo
>> Commit: http://git-wip-us.apache.org/repos/asf/logging-log4j2/commit
>> /f1b61bdd
>> Tree: http://git-wip-us.apache.org/repos/asf/logging-log4j2/tree/f1b61bdd
>> Diff: http://git-wip-us.apache.org/repos/asf/logging-log4j2/diff/f1b61bdd
>>
>> Branch: refs/heads/master
>> Commit: f1b61bddf640ecc5d2c80ea190ff7332577cb787
>> Parents: 1aa3c3e
>> Author: Gary Gregory <gg...@apache.org>
>> Authored: Sat Aug 20 17:30:16 2016 -0700
>> Committer: Gary Gregory <gg...@apache.org>
>> Committed: Sat Aug 20 17:30:16 2016 -0700
>>
>> ----------------------------------------------------------------------
>>  .../core/appender/mom/kafka/KafkaManager.java   | 185
>> +++++++++----------
>>  1 file changed, 92 insertions(+), 93 deletions(-)
>> ----------------------------------------------------------------------
>>
>>
>> http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/f
>> 1b61bdd/log4j-core/src/main/java/org/apache/logging/log4j/co
>> re/appender/mom/kafka/KafkaManager.java
>> ----------------------------------------------------------------------
>> diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/app
>> ender/mom/kafka/KafkaManager.java b/log4j-core/src/main/java/org
>> /apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java
>> index 4e4a09c..d535e02 100644
>> --- a/log4j-core/src/main/java/org/apache/logging/log4j/core/app
>> ender/mom/kafka/KafkaManager.java
>> +++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/app
>> ender/mom/kafka/KafkaManager.java
>> @@ -1,93 +1,92 @@
>> -/*
>> - * Licensed to the Apache Software Foundation (ASF) under one or more
>> - * contributor license agreements. See the NOTICE file distributed with
>> - * this work for additional information regarding copyright ownership.
>> - * The ASF licenses this file to You under the Apache license, Version
>> 2.0
>> - * (the "License"); you may not use this file except in compliance with
>> - * the License. You may obtain a copy of the License at
>> - *
>> - *      http://www.apache.org/licenses/LICENSE-2.0
>> - *
>> - * Unless required by applicable law or agreed to in writing, software
>> - * distributed under the License is distributed on an "AS IS" BASIS,
>> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
>> implied.
>> - * See the license for the specific language governing permissions and
>> - * limitations under the license.
>> - */
>> -
>> -package org.apache.logging.log4j.core.appender.mom.kafka;
>> -
>> -import java.util.Properties;
>> -import java.util.concurrent.ExecutionException;
>> -import java.util.concurrent.TimeUnit;
>> -import java.util.concurrent.TimeoutException;
>> -
>> -import org.apache.kafka.clients.producer.Producer;
>> -import org.apache.kafka.clients.producer.ProducerRecord;
>> -import org.apache.logging.log4j.core.appender.AbstractManager;
>> -import org.apache.logging.log4j.core.config.Property;
>> -import org.apache.logging.log4j.core.util.Log4jThread;
>> -
>> -public class KafkaManager extends AbstractManager {
>> -
>> -    public static final String DEFAULT_TIMEOUT_MILLIS = "30000";
>> -
>> -    /**
>> -     * package-private access for testing.
>> -     */
>> -    static KafkaProducerFactory producerFactory = new
>> DefaultKafkaProducerFactory();
>> -
>> -    private final Properties config = new Properties();
>> -    private Producer<byte[], byte[]> producer = null;
>> -    private final int timeoutMillis;
>> -
>> -    private final String topic;
>> -
>> -    public KafkaManager(final String name, final String topic, final
>> Property[] properties) {
>> -        super(name);
>> -        this.topic = topic;
>> -        config.setProperty("key.serializer",
>> "org.apache.kafka.common.serialization.ByteArraySerializer");
>> -        config.setProperty("value.serializer",
>> "org.apache.kafka.common.serialization.ByteArraySerializer");
>> -        config.setProperty("batch.size", "0");
>> -        for (final Property property : properties) {
>> -            config.setProperty(property.getName(), property.getValue());
>> -        }
>> -        this.timeoutMillis = Integer.parseInt(config.getProperty("
>> timeout.ms", DEFAULT_TIMEOUT_MILLIS));
>> -    }
>> -
>> -    @Override
>> -    public void releaseSub() {
>> -        if (producer != null) {
>> -            // This thread is a workaround for this Kafka issue:
>> https://issues.apache.org/jira/browse/KAFKA-1660
>> -            final Thread closeThread = new Log4jThread(new Runnable() {
>> -                @Override
>> -                public void run() {
>> -                    producer.close();
>> -                }
>> -            });
>> -            closeThread.setName("KafkaManager-CloseThread");
>> -            closeThread.setDaemon(true); // avoid blocking JVM shutdown
>> -            closeThread.start();
>> -            try {
>> -                closeThread.join(timeoutMillis);
>> -            } catch (final InterruptedException ignore) {
>> -                // ignore
>> -            }
>> -        }
>> -    }
>> -
>> -    public void send(final byte[] msg) throws ExecutionException,
>> InterruptedException, TimeoutException {
>> -        if (producer != null) {
>> -            producer.send(new ProducerRecord<byte[], byte[]>(topic,
>> msg)).get(timeoutMillis, TimeUnit.MILLISECONDS);
>> -        }
>> -    }
>> -
>> -    public void startup() {
>> -        producer = producerFactory.newKafkaProducer(config);
>> -    }
>> -
>> -    public String getTopic() {
>> -        return topic;
>> -    }
>> -
>> -}
>> +/*
>> + * Licensed to the Apache Software Foundation (ASF) under one or more
>> + * contributor license agreements. See the NOTICE file distributed with
>> + * this work for additional information regarding copyright ownership.
>> + * The ASF licenses this file to You under the Apache license, Version
>> 2.0
>> + * (the "License"); you may not use this file except in compliance with
>> + * the License. You may obtain a copy of the License at
>> + *
>> + *      http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing, software
>> + * distributed under the License is distributed on an "AS IS" BASIS,
>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
>> implied.
>> + * See the license for the specific language governing permissions and
>> + * limitations under the license.
>> + */
>> +
>> +package org.apache.logging.log4j.core.appender.mom.kafka;
>> +
>> +import java.util.Properties;
>> +import java.util.concurrent.ExecutionException;
>> +import java.util.concurrent.TimeUnit;
>> +import java.util.concurrent.TimeoutException;
>> +
>> +import org.apache.kafka.clients.producer.Producer;
>> +import org.apache.kafka.clients.producer.ProducerRecord;
>> +import org.apache.logging.log4j.core.appender.AbstractManager;
>> +import org.apache.logging.log4j.core.config.Property;
>> +import org.apache.logging.log4j.core.util.Log4jThread;
>> +
>> +public class KafkaManager extends AbstractManager {
>> +
>> +    public static final String DEFAULT_TIMEOUT_MILLIS = "30000";
>> +
>> +    /**
>> +     * package-private access for testing.
>> +     */
>> +    static KafkaProducerFactory producerFactory = new
>> DefaultKafkaProducerFactory();
>> +
>> +    private final Properties config = new Properties();
>> +    private Producer<byte[], byte[]> producer = null;
>> +    private final int timeoutMillis;
>> +
>> +    private final String topic;
>> +
>> +    public KafkaManager(final String name, final String topic, final
>> Property[] properties) {
>> +        super(name);
>> +        this.topic = topic;
>> +        config.setProperty("key.serializer",
>> "org.apache.kafka.common.serialization.ByteArraySerializer");
>> +        config.setProperty("value.serializer",
>> "org.apache.kafka.common.serialization.ByteArraySerializer");
>> +        config.setProperty("batch.size", "0");
>> +        for (final Property property : properties) {
>> +            config.setProperty(property.getName(), property.getValue());
>> +        }
>> +        this.timeoutMillis = Integer.parseInt(config.getProperty("
>> timeout.ms", DEFAULT_TIMEOUT_MILLIS));
>> +    }
>> +
>> +    @Override
>> +    public void releaseSub() {
>> +        if (producer != null) {
>> +            // This thread is a workaround for this Kafka issue:
>> https://issues.apache.org/jira/browse/KAFKA-1660
>> +            final Thread closeThread = new Log4jThread(new Runnable() {
>> +                @Override
>> +                public void run() {
>> +                    producer.close();
>> +                }
>> +            }, "KafkaManager-CloseThread");
>> +            closeThread.setDaemon(true); // avoid blocking JVM shutdown
>> +            closeThread.start();
>> +            try {
>> +                closeThread.join(timeoutMillis);
>> +            } catch (final InterruptedException ignore) {
>> +                // ignore
>> +            }
>> +        }
>> +    }
>> +
>> +    public void send(final byte[] msg) throws ExecutionException,
>> InterruptedException, TimeoutException {
>> +        if (producer != null) {
>> +            producer.send(new ProducerRecord<byte[], byte[]>(topic,
>> msg)).get(timeoutMillis, TimeUnit.MILLISECONDS);
>> +        }
>> +    }
>> +
>> +    public void startup() {
>> +        producer = producerFactory.newKafkaProducer(config);
>> +    }
>> +
>> +    public String getTopic() {
>> +        return topic;
>> +    }
>> +
>> +}
>>
>>
>


-- 
Matt Sicker <bo...@gmail.com>

Re: logging-log4j2 git commit: Use Log4jThread to name the thread.

Posted by Remko Popma <re...@gmail.com>.
Gary, can you please check your IDE settings for line endings?
It is really difficult to see what changed if the whole file is marked as
modified in the commit mail.

I suspect this is because your IDE converts the unix line endings to
windows CRLF or something.

On Sun, Aug 21, 2016 at 9:30 AM, <gg...@apache.org> wrote:

> Repository: logging-log4j2
> Updated Branches:
>   refs/heads/master 1aa3c3e24 -> f1b61bddf
>
>
> Use Log4jThread to name the thread.
>
> Project: http://git-wip-us.apache.org/repos/asf/logging-log4j2/repo
> Commit: http://git-wip-us.apache.org/repos/asf/logging-log4j2/
> commit/f1b61bdd
> Tree: http://git-wip-us.apache.org/repos/asf/logging-log4j2/tree/f1b61bdd
> Diff: http://git-wip-us.apache.org/repos/asf/logging-log4j2/diff/f1b61bdd
>
> Branch: refs/heads/master
> Commit: f1b61bddf640ecc5d2c80ea190ff7332577cb787
> Parents: 1aa3c3e
> Author: Gary Gregory <gg...@apache.org>
> Authored: Sat Aug 20 17:30:16 2016 -0700
> Committer: Gary Gregory <gg...@apache.org>
> Committed: Sat Aug 20 17:30:16 2016 -0700
>
> ----------------------------------------------------------------------
>  .../core/appender/mom/kafka/KafkaManager.java   | 185 +++++++++----------
>  1 file changed, 92 insertions(+), 93 deletions(-)
> ----------------------------------------------------------------------
>
>
> http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/
> f1b61bdd/log4j-core/src/main/java/org/apache/logging/log4j/
> core/appender/mom/kafka/KafkaManager.java
> ----------------------------------------------------------------------
> diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/
> appender/mom/kafka/KafkaManager.java b/log4j-core/src/main/java/
> org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java
> index 4e4a09c..d535e02 100644
> --- a/log4j-core/src/main/java/org/apache/logging/log4j/core/
> appender/mom/kafka/KafkaManager.java
> +++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/
> appender/mom/kafka/KafkaManager.java
> @@ -1,93 +1,92 @@
> -/*
> - * Licensed to the Apache Software Foundation (ASF) under one or more
> - * contributor license agreements. See the NOTICE file distributed with
> - * this work for additional information regarding copyright ownership.
> - * The ASF licenses this file to You under the Apache license, Version 2.0
> - * (the "License"); you may not use this file except in compliance with
> - * the License. You may obtain a copy of the License at
> - *
> - *      http://www.apache.org/licenses/LICENSE-2.0
> - *
> - * Unless required by applicable law or agreed to in writing, software
> - * distributed under the License is distributed on an "AS IS" BASIS,
> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> - * See the license for the specific language governing permissions and
> - * limitations under the license.
> - */
> -
> -package org.apache.logging.log4j.core.appender.mom.kafka;
> -
> -import java.util.Properties;
> -import java.util.concurrent.ExecutionException;
> -import java.util.concurrent.TimeUnit;
> -import java.util.concurrent.TimeoutException;
> -
> -import org.apache.kafka.clients.producer.Producer;
> -import org.apache.kafka.clients.producer.ProducerRecord;
> -import org.apache.logging.log4j.core.appender.AbstractManager;
> -import org.apache.logging.log4j.core.config.Property;
> -import org.apache.logging.log4j.core.util.Log4jThread;
> -
> -public class KafkaManager extends AbstractManager {
> -
> -    public static final String DEFAULT_TIMEOUT_MILLIS = "30000";
> -
> -    /**
> -     * package-private access for testing.
> -     */
> -    static KafkaProducerFactory producerFactory = new
> DefaultKafkaProducerFactory();
> -
> -    private final Properties config = new Properties();
> -    private Producer<byte[], byte[]> producer = null;
> -    private final int timeoutMillis;
> -
> -    private final String topic;
> -
> -    public KafkaManager(final String name, final String topic, final
> Property[] properties) {
> -        super(name);
> -        this.topic = topic;
> -        config.setProperty("key.serializer", "org.apache.kafka.common.
> serialization.ByteArraySerializer");
> -        config.setProperty("value.serializer", "org.apache.kafka.common.
> serialization.ByteArraySerializer");
> -        config.setProperty("batch.size", "0");
> -        for (final Property property : properties) {
> -            config.setProperty(property.getName(), property.getValue());
> -        }
> -        this.timeoutMillis = Integer.parseInt(config.getProperty("
> timeout.ms", DEFAULT_TIMEOUT_MILLIS));
> -    }
> -
> -    @Override
> -    public void releaseSub() {
> -        if (producer != null) {
> -            // This thread is a workaround for this Kafka issue:
> https://issues.apache.org/jira/browse/KAFKA-1660
> -            final Thread closeThread = new Log4jThread(new Runnable() {
> -                @Override
> -                public void run() {
> -                    producer.close();
> -                }
> -            });
> -            closeThread.setName("KafkaManager-CloseThread");
> -            closeThread.setDaemon(true); // avoid blocking JVM shutdown
> -            closeThread.start();
> -            try {
> -                closeThread.join(timeoutMillis);
> -            } catch (final InterruptedException ignore) {
> -                // ignore
> -            }
> -        }
> -    }
> -
> -    public void send(final byte[] msg) throws ExecutionException,
> InterruptedException, TimeoutException {
> -        if (producer != null) {
> -            producer.send(new ProducerRecord<byte[], byte[]>(topic,
> msg)).get(timeoutMillis, TimeUnit.MILLISECONDS);
> -        }
> -    }
> -
> -    public void startup() {
> -        producer = producerFactory.newKafkaProducer(config);
> -    }
> -
> -    public String getTopic() {
> -        return topic;
> -    }
> -
> -}
> +/*
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements. See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache license, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License. You may obtain a copy of the License at
> + *
> + *      http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> + * See the license for the specific language governing permissions and
> + * limitations under the license.
> + */
> +
> +package org.apache.logging.log4j.core.appender.mom.kafka;
> +
> +import java.util.Properties;
> +import java.util.concurrent.ExecutionException;
> +import java.util.concurrent.TimeUnit;
> +import java.util.concurrent.TimeoutException;
> +
> +import org.apache.kafka.clients.producer.Producer;
> +import org.apache.kafka.clients.producer.ProducerRecord;
> +import org.apache.logging.log4j.core.appender.AbstractManager;
> +import org.apache.logging.log4j.core.config.Property;
> +import org.apache.logging.log4j.core.util.Log4jThread;
> +
> +public class KafkaManager extends AbstractManager {
> +
> +    public static final String DEFAULT_TIMEOUT_MILLIS = "30000";
> +
> +    /**
> +     * package-private access for testing.
> +     */
> +    static KafkaProducerFactory producerFactory = new
> DefaultKafkaProducerFactory();
> +
> +    private final Properties config = new Properties();
> +    private Producer<byte[], byte[]> producer = null;
> +    private final int timeoutMillis;
> +
> +    private final String topic;
> +
> +    public KafkaManager(final String name, final String topic, final
> Property[] properties) {
> +        super(name);
> +        this.topic = topic;
> +        config.setProperty("key.serializer", "org.apache.kafka.common.
> serialization.ByteArraySerializer");
> +        config.setProperty("value.serializer", "org.apache.kafka.common.
> serialization.ByteArraySerializer");
> +        config.setProperty("batch.size", "0");
> +        for (final Property property : properties) {
> +            config.setProperty(property.getName(), property.getValue());
> +        }
> +        this.timeoutMillis = Integer.parseInt(config.getProperty("
> timeout.ms", DEFAULT_TIMEOUT_MILLIS));
> +    }
> +
> +    @Override
> +    public void releaseSub() {
> +        if (producer != null) {
> +            // This thread is a workaround for this Kafka issue:
> https://issues.apache.org/jira/browse/KAFKA-1660
> +            final Thread closeThread = new Log4jThread(new Runnable() {
> +                @Override
> +                public void run() {
> +                    producer.close();
> +                }
> +            }, "KafkaManager-CloseThread");
> +            closeThread.setDaemon(true); // avoid blocking JVM shutdown
> +            closeThread.start();
> +            try {
> +                closeThread.join(timeoutMillis);
> +            } catch (final InterruptedException ignore) {
> +                // ignore
> +            }
> +        }
> +    }
> +
> +    public void send(final byte[] msg) throws ExecutionException,
> InterruptedException, TimeoutException {
> +        if (producer != null) {
> +            producer.send(new ProducerRecord<byte[], byte[]>(topic,
> msg)).get(timeoutMillis, TimeUnit.MILLISECONDS);
> +        }
> +    }
> +
> +    public void startup() {
> +        producer = producerFactory.newKafkaProducer(config);
> +    }
> +
> +    public String getTopic() {
> +        return topic;
> +    }
> +
> +}
>
>