You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@chukwa.apache.org by ey...@apache.org on 2017/03/05 20:38:25 UTC
[1/2] chukwa git commit: CHUKWA-818. Convert Solr client to singleton
to prevent connection leaks. (Eric Yang)
Repository: chukwa
Updated Branches:
refs/heads/master ffd0d400e -> 971641927
CHUKWA-818. Convert Solr client to singleton to prevent connection leaks. (Eric Yang)
Project: http://git-wip-us.apache.org/repos/asf/chukwa/repo
Commit: http://git-wip-us.apache.org/repos/asf/chukwa/commit/5bf50709
Tree: http://git-wip-us.apache.org/repos/asf/chukwa/tree/5bf50709
Diff: http://git-wip-us.apache.org/repos/asf/chukwa/diff/5bf50709
Branch: refs/heads/master
Commit: 5bf50709d070144bf958616111efa57c6c0475a5
Parents: ffd0d40
Author: Eric Yang <ey...@apache.org>
Authored: Sun Mar 5 12:36:15 2017 -0800
Committer: Eric Yang <ey...@apache.org>
Committed: Sun Mar 5 12:36:15 2017 -0800
----------------------------------------------------------------------
CHANGES.txt | 2 ++
.../hadoop/chukwa/datacollection/writer/solr/SolrWriter.java | 3 +--
2 files changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/chukwa/blob/5bf50709/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3780ac6..1d0eb56 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -29,6 +29,8 @@ Release 0.8 - 05/22/2016
BUGS
+ CHUKWA-818. Convert Solr client to singleton to prevent connection leaks. (Eric Yang)
+
CHUKWA-808. Fix solr startup for Docker support. (Eric Yang)
CHUKWA-807. Update Docker support to current. (Eric Yang)
http://git-wip-us.apache.org/repos/asf/chukwa/blob/5bf50709/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/solr/SolrWriter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/solr/SolrWriter.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/solr/SolrWriter.java
index a0fd7ff..369b135 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/solr/SolrWriter.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/solr/SolrWriter.java
@@ -38,7 +38,7 @@ import org.apache.solr.common.SolrInputDocument;
public class SolrWriter extends PipelineableWriter {
private static Logger log = Logger.getLogger(SolrWriter.class);
- private CloudSolrClient client;
+ private static CloudSolrClient client;
private final static String ID = "id";
private final static String SEQ_ID = "seqId";
private final static String DATA_TYPE = "type";
@@ -119,7 +119,6 @@ public class SolrWriter extends PipelineableWriter {
} catch (Exception e) {
log.warn("Failed to store data to Solr Cloud.");
log.warn(ExceptionUtil.getStackTrace(e));
- client = null;
}
}
try {
[2/2] chukwa git commit: CHUKWA-817. Prevent connections leaks when
HBaseWriter or parser generate errors. (Eric Yang)
Posted by ey...@apache.org.
CHUKWA-817. Prevent connections leaks when HBaseWriter or parser generate errors. (Eric Yang)
Project: http://git-wip-us.apache.org/repos/asf/chukwa/repo
Commit: http://git-wip-us.apache.org/repos/asf/chukwa/commit/97164192
Tree: http://git-wip-us.apache.org/repos/asf/chukwa/tree/97164192
Diff: http://git-wip-us.apache.org/repos/asf/chukwa/diff/97164192
Branch: refs/heads/master
Commit: 971641927fd399143329bcc0480e0f1e6b032a74
Parents: 5bf5070
Author: Eric Yang <ey...@apache.org>
Authored: Sun Mar 5 12:38:17 2017 -0800
Committer: Eric Yang <ey...@apache.org>
Committed: Sun Mar 5 12:38:17 2017 -0800
----------------------------------------------------------------------
CHANGES.txt | 6 +-
.../writer/PipelineStageWriter.java | 9 ++-
.../datacollection/writer/WriterException.java | 5 +-
.../writer/hbase/HBaseWriter.java | 27 ++++++--
.../hbase/HadoopMetricsProcessor.java | 70 ++++++++++++--------
.../inputtools/log4j/Log4jMetricsSink.java | 2 +-
6 files changed, 79 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/chukwa/blob/97164192/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1d0eb56..882c9ab 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -11,6 +11,10 @@ Trunk (unreleased changes)
BUGS
+ CHUKWA-818. Convert Solr client to singleton to prevent connection leaks. (Eric Yang)
+
+ CHUKWA-817. Prevent connections leaks when HBaseWriter or parser generate errors. (Eric Yang)
+
CHUKWA-813. Fix logviewer for sorting log entries. (Eric Yang)
CHUKWA-812. Added throttle to dashboard save. (Eric Yang)
@@ -29,8 +33,6 @@ Release 0.8 - 05/22/2016
BUGS
- CHUKWA-818. Convert Solr client to singleton to prevent connection leaks. (Eric Yang)
-
CHUKWA-808. Fix solr startup for Docker support. (Eric Yang)
CHUKWA-807. Update Docker support to current. (Eric Yang)
http://git-wip-us.apache.org/repos/asf/chukwa/blob/97164192/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java
index 141be20..bd98fb6 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java
@@ -92,7 +92,11 @@ public class PipelineStageWriter implements ChukwaWriter {
// if authentication type is kerberos; login using the specified kerberos principal and keytab file
for(int i=0; i<classes.length; i++) {
if(classes[i].contains("HBaseWriter")) {
- loginToKerberos (conf);
+ try {
+ loginToKerberos (conf);
+ } catch(IOException e) {
+ throw new WriterException("Unable to login to Kerberos.");
+ }
}
}
@@ -111,8 +115,7 @@ public class PipelineStageWriter implements ChukwaWriter {
writer = (ChukwaWriter) st; // one stage pipeline
}
return;
- } catch (IOException |
- WriterException |
+ } catch (WriterException |
ClassNotFoundException |
IllegalAccessException |
InstantiationException e) {
http://git-wip-us.apache.org/repos/asf/chukwa/blob/97164192/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/WriterException.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/WriterException.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/WriterException.java
index e2a2e45..dc0f0b1 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/WriterException.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/WriterException.java
@@ -17,13 +17,14 @@
*/
package org.apache.hadoop.chukwa.datacollection.writer;
+import java.io.IOException;
-public class WriterException extends Exception {
+public class WriterException extends IOException {
/**
*
*/
- private static final long serialVersionUID = -4207275200546397145L;
+ private static final long serialVersionUID = -4207275200546397146L;
public WriterException() {
}
http://git-wip-us.apache.org/repos/asf/chukwa/blob/97164192/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/HBaseWriter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/HBaseWriter.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/HBaseWriter.java
index 5ba87bd..4a67226 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/HBaseWriter.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/HBaseWriter.java
@@ -132,9 +132,18 @@ public class HBaseWriter extends PipelineableWriter {
@Override
public CommitStatus add(List<Chunk> chunks) throws WriterException {
CommitStatus rv = ChukwaWriter.COMMIT_OK;
+ Table hbase;
+ Table meta;
try {
- Table hbase = connection.getTable(TableName.valueOf(CHUKWA_TABLE));
- Table meta = connection.getTable(TableName.valueOf(CHUKWA_META_TABLE));
+ if (connection == null || connection.isClosed()) {
+ try {
+ connection = ConnectionFactory.createConnection(hconf);
+ } catch (IOException e) {
+ throw new WriterException("HBase is offline, retry later...");
+ }
+ }
+ hbase = connection.getTable(TableName.valueOf(CHUKWA_TABLE));
+ meta = connection.getTable(TableName.valueOf(CHUKWA_META_TABLE));
for(Chunk chunk : chunks) {
synchronized (this) {
try {
@@ -143,7 +152,8 @@ public class HBaseWriter extends PipelineableWriter {
hbase.put(output);
meta.put(reporter.getInfo());
} catch (Throwable e) {
- log.warn(output);
+ log.warn("Unable to process data:");
+ log.warn(new String(chunk.getData()));
log.warn(ExceptionUtil.getStackTrace(e));
}
dataSize += chunk.getData().length;
@@ -155,8 +165,15 @@ public class HBaseWriter extends PipelineableWriter {
meta.close();
} catch (Exception e) {
log.error(ExceptionUtil.getStackTrace(e));
- throw new WriterException("Failed to store data to HBase.");
- }
+ if(connection != null) {
+ try {
+ connection.close();
+ } catch(IOException e2) {
+ connection = null;
+ throw new WriterException("HBase connection maybe leaking.");
+ }
+ }
+ }
if (next != null) {
rv = next.add(chunks); //pass data through
}
http://git-wip-us.apache.org/repos/asf/chukwa/blob/97164192/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/HadoopMetricsProcessor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/HadoopMetricsProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/HadoopMetricsProcessor.java
index 9c719e4..b0aab83 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/HadoopMetricsProcessor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/HadoopMetricsProcessor.java
@@ -28,6 +28,7 @@ import java.util.Map.Entry;
import org.apache.log4j.Logger;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
+import org.apache.hadoop.chukwa.util.ExceptionUtil;
public class HadoopMetricsProcessor extends AbstractProcessor {
@@ -44,38 +45,53 @@ public class HadoopMetricsProcessor extends AbstractProcessor {
@Override
protected void parse(byte[] recordEntry) throws Throwable {
- String body = new String(recordEntry, Charset.forName("UTF-8"));
- int start = body.indexOf('{');
- JSONObject json = (JSONObject) JSONValue.parse(body.substring(start));
+ String body = new String(recordEntry, Charset.forName("UTF-8"));
+ int start = 0;
+ int end = 0;
+ try {
+ while(true) {
+ start = body.indexOf('{', end);
+ end = body.indexOf('}', start)+1;
+ if (start == -1)
+ break;
- time = ((Long) json.get(timestampField)).longValue();
- String contextName = (String) json.get(contextNameField);
- String recordName = (String) json.get(recordNameField);
- String src = ((String) json.get(hostName)).toLowerCase();
- if(json.get(processName)!=null) {
- src = new StringBuilder(src).append(":").append(json.get(processName)).toString();
- }
- for(Entry<String, Object> entry : (Set<Map.Entry>) json.entrySet()) {
- String keyName = entry.getKey();
- if (timestampField.intern() == keyName.intern()) {
- continue;
- } else if (contextNameField.intern() == keyName.intern()) {
- continue;
- } else if (recordNameField.intern() == keyName.intern()) {
- continue;
- } else if (hostName.intern() == keyName.intern()) {
- continue;
- } else if (processName.intern() == keyName.intern()) {
- continue;
- } else {
- if(json.get(keyName)!=null) {
- String v = entry.getValue().toString();
- String primaryKey = new StringBuilder(contextName).append(".")
+ JSONObject json = (JSONObject) JSONValue.parse(body.substring(start,end));
+
+ time = ((Long) json.get(timestampField)).longValue();
+ String contextName = (String) json.get(contextNameField);
+ String recordName = (String) json.get(recordNameField);
+ String src = ((String) json.get(hostName)).toLowerCase();
+ if(json.get(processName)!=null) {
+ src = new StringBuilder(src).append(":").append(json.get(processName)).toString();
+ }
+ for(Entry<String, Object> entry : (Set<Map.Entry>) json.entrySet()) {
+ String keyName = entry.getKey();
+ if (timestampField.intern() == keyName.intern()) {
+ continue;
+ } else if (contextNameField.intern() == keyName.intern()) {
+ continue;
+ } else if (recordNameField.intern() == keyName.intern()) {
+ continue;
+ } else if (hostName.intern() == keyName.intern()) {
+ continue;
+ } else if (processName.intern() == keyName.intern()) {
+ continue;
+ } else {
+ if(json.get(keyName)!=null) {
+ String v = entry.getValue().toString();
+ String primaryKey = new StringBuilder(contextName).append(".")
.append(recordName).append(".").append(keyName).toString();
- addRecord(time, primaryKey, src, v.getBytes(Charset.forName("UTF-8")), output);
+ addRecord(time, primaryKey, src, v.getBytes(Charset.forName("UTF-8")), output);
+ }
}
}
}
+ } catch(Exception e) {
+ LOG.warn("Unparsable data:");
+ LOG.warn(body);
+ LOG.warn(ExceptionUtil.getStackTrace(e));
+ // Skip unparsable data.
+ }
}
}
http://git-wip-us.apache.org/repos/asf/chukwa/blob/97164192/src/main/java/org/apache/hadoop/chukwa/inputtools/log4j/Log4jMetricsSink.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/inputtools/log4j/Log4jMetricsSink.java b/src/main/java/org/apache/hadoop/chukwa/inputtools/log4j/Log4jMetricsSink.java
index cb97223..3e6189d 100644
--- a/src/main/java/org/apache/hadoop/chukwa/inputtools/log4j/Log4jMetricsSink.java
+++ b/src/main/java/org/apache/hadoop/chukwa/inputtools/log4j/Log4jMetricsSink.java
@@ -38,7 +38,7 @@ public class Log4jMetricsSink implements MetricsSink {
protected String context = "HadoopMetrics";
protected String host = "localhost";
protected int port = 9095;
- protected Logger out = null;
+ protected static Logger out = null;
@Override
public void init(SubsetConfiguration conf) {