You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@chukwa.apache.org by ey...@apache.org on 2017/03/05 20:38:25 UTC

[1/2] chukwa git commit: CHUKWA-818. Convert Solr client to singleton to prevent connection leaks. (Eric Yang)

Repository: chukwa
Updated Branches:
  refs/heads/master ffd0d400e -> 971641927


CHUKWA-818. Convert Solr client to singleton to prevent connection leaks.  (Eric Yang)


Project: http://git-wip-us.apache.org/repos/asf/chukwa/repo
Commit: http://git-wip-us.apache.org/repos/asf/chukwa/commit/5bf50709
Tree: http://git-wip-us.apache.org/repos/asf/chukwa/tree/5bf50709
Diff: http://git-wip-us.apache.org/repos/asf/chukwa/diff/5bf50709

Branch: refs/heads/master
Commit: 5bf50709d070144bf958616111efa57c6c0475a5
Parents: ffd0d40
Author: Eric Yang <ey...@apache.org>
Authored: Sun Mar 5 12:36:15 2017 -0800
Committer: Eric Yang <ey...@apache.org>
Committed: Sun Mar 5 12:36:15 2017 -0800

----------------------------------------------------------------------
 CHANGES.txt                                                       | 2 ++
 .../hadoop/chukwa/datacollection/writer/solr/SolrWriter.java      | 3 +--
 2 files changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/chukwa/blob/5bf50709/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3780ac6..1d0eb56 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -29,6 +29,8 @@ Release 0.8 - 05/22/2016
 
   BUGS
 
+    CHUKWA-818. Convert Solr client to singleton to prevent connection leaks.  (Eric Yang)
+
     CHUKWA-808. Fix solr startup for Docker support.  (Eric Yang)
 
     CHUKWA-807. Update Docker support to current.  (Eric Yang)

http://git-wip-us.apache.org/repos/asf/chukwa/blob/5bf50709/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/solr/SolrWriter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/solr/SolrWriter.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/solr/SolrWriter.java
index a0fd7ff..369b135 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/solr/SolrWriter.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/solr/SolrWriter.java
@@ -38,7 +38,7 @@ import org.apache.solr.common.SolrInputDocument;
 
 public class SolrWriter extends PipelineableWriter {
   private static Logger log = Logger.getLogger(SolrWriter.class);
-  private CloudSolrClient client;
+  private static CloudSolrClient client;
   private final static String ID = "id";
   private final static String SEQ_ID = "seqId";
   private final static String DATA_TYPE = "type";
@@ -119,7 +119,6 @@ public class SolrWriter extends PipelineableWriter {
       } catch (Exception e) {
         log.warn("Failed to store data to Solr Cloud.");
         log.warn(ExceptionUtil.getStackTrace(e));
-        client = null;
       }
     }
     try {


[2/2] chukwa git commit: CHUKWA-817. Prevent connections leaks when HBaseWriter or parser generate errors. (Eric Yang)

Posted by ey...@apache.org.
CHUKWA-817.  Prevent connections leaks when HBaseWriter or parser generate errors.  (Eric Yang)


Project: http://git-wip-us.apache.org/repos/asf/chukwa/repo
Commit: http://git-wip-us.apache.org/repos/asf/chukwa/commit/97164192
Tree: http://git-wip-us.apache.org/repos/asf/chukwa/tree/97164192
Diff: http://git-wip-us.apache.org/repos/asf/chukwa/diff/97164192

Branch: refs/heads/master
Commit: 971641927fd399143329bcc0480e0f1e6b032a74
Parents: 5bf5070
Author: Eric Yang <ey...@apache.org>
Authored: Sun Mar 5 12:38:17 2017 -0800
Committer: Eric Yang <ey...@apache.org>
Committed: Sun Mar 5 12:38:17 2017 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  6 +-
 .../writer/PipelineStageWriter.java             |  9 ++-
 .../datacollection/writer/WriterException.java  |  5 +-
 .../writer/hbase/HBaseWriter.java               | 27 ++++++--
 .../hbase/HadoopMetricsProcessor.java           | 70 ++++++++++++--------
 .../inputtools/log4j/Log4jMetricsSink.java      |  2 +-
 6 files changed, 79 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/chukwa/blob/97164192/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1d0eb56..882c9ab 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -11,6 +11,10 @@ Trunk (unreleased changes)
 
   BUGS
 
+    CHUKWA-818. Convert Solr client to singleton to prevent connection leaks.  (Eric Yang)
+
+    CHUKWA-817.  Prevent connections leaks when HBaseWriter or parser generate errors.  (Eric Yang)
+
     CHUKWA-813.  Fix logviewer for sorting log entries. (Eric Yang)
 
     CHUKWA-812.  Added throttle to dashboard save. (Eric Yang)
@@ -29,8 +33,6 @@ Release 0.8 - 05/22/2016
 
   BUGS
 
-    CHUKWA-818. Convert Solr client to singleton to prevent connection leaks.  (Eric Yang)
-
     CHUKWA-808. Fix solr startup for Docker support.  (Eric Yang)
 
     CHUKWA-807. Update Docker support to current.  (Eric Yang)

http://git-wip-us.apache.org/repos/asf/chukwa/blob/97164192/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java
index 141be20..bd98fb6 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java
@@ -92,7 +92,11 @@ public class PipelineStageWriter implements ChukwaWriter {
         // if authentication type is kerberos; login using the specified kerberos principal and keytab file
         for(int i=0; i<classes.length; i++) {
           if(classes[i].contains("HBaseWriter")) {
-            loginToKerberos (conf);
+            try {
+              loginToKerberos (conf);
+            } catch(IOException e) {
+              throw new WriterException("Unable to login to Kerberos.");
+            }
           }
         }
         
@@ -111,8 +115,7 @@ public class PipelineStageWriter implements ChukwaWriter {
             writer = (ChukwaWriter) st; // one stage pipeline
         }
         return;
-      } catch (IOException | 
-          WriterException | 
+      } catch (WriterException | 
           ClassNotFoundException | 
           IllegalAccessException | 
           InstantiationException e) {

http://git-wip-us.apache.org/repos/asf/chukwa/blob/97164192/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/WriterException.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/WriterException.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/WriterException.java
index e2a2e45..dc0f0b1 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/WriterException.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/WriterException.java
@@ -17,13 +17,14 @@
  */
 package org.apache.hadoop.chukwa.datacollection.writer;
 
+import java.io.IOException;
 
-public class WriterException extends Exception {
+public class WriterException extends IOException {
 
   /**
 	 * 
 	 */
-  private static final long serialVersionUID = -4207275200546397145L;
+  private static final long serialVersionUID = -4207275200546397146L;
 
   public WriterException() {
   }

http://git-wip-us.apache.org/repos/asf/chukwa/blob/97164192/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/HBaseWriter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/HBaseWriter.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/HBaseWriter.java
index 5ba87bd..4a67226 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/HBaseWriter.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/HBaseWriter.java
@@ -132,9 +132,18 @@ public class HBaseWriter extends PipelineableWriter {
   @Override
   public CommitStatus add(List<Chunk> chunks) throws WriterException {
     CommitStatus rv = ChukwaWriter.COMMIT_OK;
+    Table hbase;
+    Table meta;
     try {
-      Table hbase = connection.getTable(TableName.valueOf(CHUKWA_TABLE));
-      Table meta = connection.getTable(TableName.valueOf(CHUKWA_META_TABLE));
+      if (connection == null || connection.isClosed()) {
+        try {
+          connection = ConnectionFactory.createConnection(hconf);
+        } catch (IOException e) {
+          throw new WriterException("HBase is offline, retry later...");
+        }
+      }
+      hbase = connection.getTable(TableName.valueOf(CHUKWA_TABLE));
+      meta = connection.getTable(TableName.valueOf(CHUKWA_META_TABLE));
       for(Chunk chunk : chunks) {
         synchronized (this) {
           try {
@@ -143,7 +152,8 @@ public class HBaseWriter extends PipelineableWriter {
             hbase.put(output);
             meta.put(reporter.getInfo());
           } catch (Throwable e) {
-            log.warn(output);
+            log.warn("Unable to process data:");
+            log.warn(new String(chunk.getData()));
             log.warn(ExceptionUtil.getStackTrace(e));
           }
           dataSize += chunk.getData().length;
@@ -155,8 +165,15 @@ public class HBaseWriter extends PipelineableWriter {
       meta.close();
     } catch (Exception e) {
       log.error(ExceptionUtil.getStackTrace(e));
-      throw new WriterException("Failed to store data to HBase.");
-    }    
+      if(connection != null) {
+        try {
+          connection.close();
+        } catch(IOException e2) {
+          connection = null;
+          throw new WriterException("HBase connection maybe leaking.");
+        }
+      }
+    } 
     if (next != null) {
       rv = next.add(chunks); //pass data through
     }

http://git-wip-us.apache.org/repos/asf/chukwa/blob/97164192/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/HadoopMetricsProcessor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/HadoopMetricsProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/HadoopMetricsProcessor.java
index 9c719e4..b0aab83 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/HadoopMetricsProcessor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/HadoopMetricsProcessor.java
@@ -28,6 +28,7 @@ import java.util.Map.Entry;
 import org.apache.log4j.Logger;
 import org.json.simple.JSONObject;
 import org.json.simple.JSONValue;
+import org.apache.hadoop.chukwa.util.ExceptionUtil;
 
 public class HadoopMetricsProcessor extends AbstractProcessor {
   
@@ -44,38 +45,53 @@ public class HadoopMetricsProcessor extends AbstractProcessor {
 
   @Override
   protected void parse(byte[] recordEntry) throws Throwable {
-      String body = new String(recordEntry, Charset.forName("UTF-8"));
-      int start = body.indexOf('{');
-      JSONObject json = (JSONObject) JSONValue.parse(body.substring(start));
+    String body = new String(recordEntry, Charset.forName("UTF-8"));
+    int start = 0;
+    int end = 0;
+    try {
+      while(true) {
+        start = body.indexOf('{', end);
+        end = body.indexOf('}', start)+1;
+        if (start == -1)
+          break;
 
-      time = ((Long) json.get(timestampField)).longValue();
-      String contextName = (String) json.get(contextNameField);
-      String recordName = (String) json.get(recordNameField);
-      String src = ((String) json.get(hostName)).toLowerCase();
-      if(json.get(processName)!=null) {
-        src = new StringBuilder(src).append(":").append(json.get(processName)).toString();
-      }
-      for(Entry<String, Object> entry : (Set<Map.Entry>) json.entrySet()) {
-        String keyName = entry.getKey();
-        if (timestampField.intern() == keyName.intern()) {
-          continue;
-        } else if (contextNameField.intern() == keyName.intern()) {
-          continue;
-        } else if (recordNameField.intern() == keyName.intern()) {
-          continue;
-        } else if (hostName.intern() == keyName.intern()) {
-          continue;
-        } else if (processName.intern() == keyName.intern()) {
-          continue;
-        } else {
-          if(json.get(keyName)!=null) {
-            String v = entry.getValue().toString();
-            String primaryKey = new StringBuilder(contextName).append(".")
+        JSONObject json = (JSONObject) JSONValue.parse(body.substring(start,end));
+
+        time = ((Long) json.get(timestampField)).longValue();
+        String contextName = (String) json.get(contextNameField);
+        String recordName = (String) json.get(recordNameField);
+        String src = ((String) json.get(hostName)).toLowerCase();
+        if(json.get(processName)!=null) {
+          src = new StringBuilder(src).append(":").append(json.get(processName)).toString();
+        }
+        for(Entry<String, Object> entry : (Set<Map.Entry>) json.entrySet()) {
+          String keyName = entry.getKey();
+          if (timestampField.intern() == keyName.intern()) {
+            continue;
+          } else if (contextNameField.intern() == keyName.intern()) {
+            continue;
+          } else if (recordNameField.intern() == keyName.intern()) {
+            continue;
+          } else if (hostName.intern() == keyName.intern()) {
+            continue;
+          } else if (processName.intern() == keyName.intern()) {
+            continue;
+          } else {
+            if(json.get(keyName)!=null) {
+              String v = entry.getValue().toString();
+              String primaryKey = new StringBuilder(contextName).append(".")
                 .append(recordName).append(".").append(keyName).toString();
-            addRecord(time, primaryKey, src, v.getBytes(Charset.forName("UTF-8")), output);
+              addRecord(time, primaryKey, src, v.getBytes(Charset.forName("UTF-8")), output);
+            }
           }
         }
       }
+    } catch(Exception e) {
+      LOG.warn("Unparsable data:");
+      LOG.warn(body);
+      LOG.warn(ExceptionUtil.getStackTrace(e));
+      // Skip unparsable data.
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/chukwa/blob/97164192/src/main/java/org/apache/hadoop/chukwa/inputtools/log4j/Log4jMetricsSink.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/inputtools/log4j/Log4jMetricsSink.java b/src/main/java/org/apache/hadoop/chukwa/inputtools/log4j/Log4jMetricsSink.java
index cb97223..3e6189d 100644
--- a/src/main/java/org/apache/hadoop/chukwa/inputtools/log4j/Log4jMetricsSink.java
+++ b/src/main/java/org/apache/hadoop/chukwa/inputtools/log4j/Log4jMetricsSink.java
@@ -38,7 +38,7 @@ public class Log4jMetricsSink implements MetricsSink {
   protected String context = "HadoopMetrics";
   protected String host = "localhost";
   protected int port = 9095;
-  protected Logger out = null;
+  protected static Logger out = null;
 
   @Override
   public void init(SubsetConfiguration conf) {