You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by as...@apache.org on 2009/12/11 22:58:03 UTC

svn commit: r889831 - in /hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection: adaptor/DirTailingAdaptor.java adaptor/filetailer/FileTailingAdaptor.java connector/http/HttpConnector.java sender/ChukwaHttpSender.java

Author: asrabkin
Date: Fri Dec 11 21:58:01 2009
New Revision: 889831

URL: http://svn.apache.org/viewvc?rev=889831&view=rev
Log:
CHUKWA-418. More logging enhancement

Modified:
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/DirTailingAdaptor.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/connector/http/HttpConnector.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/DirTailingAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/DirTailingAdaptor.java?rev=889831&r1=889830&r2=889831&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/DirTailingAdaptor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/DirTailingAdaptor.java Fri Dec 11 21:58:01 2009
@@ -89,8 +89,9 @@
     if(!dir.isDirectory() ) {
       //Don't start tailing if we would have gotten it on the last pass 
       if(dir.lastModified() >= lastSweepStartTime) {
-        control.processAddCommand(
+        String newAdaptorID = control.processAddCommand(
             "add " + adaptorName +" " + type + " " + dir.getCanonicalPath() + " 0");
+        log.info("DirTailingAdaptor " + adaptorID +  "  started new adaptor " + newAdaptorID);
       } 
     } else {
       for(File f: dir.listFiles()) {

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java?rev=889831&r1=889830&r2=889831&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java Fri Dec 11 21:58:01 2009
@@ -54,7 +54,7 @@
     log.info("chukwaAgent.fileTailingAdaptor.maxReadSize: " + MAX_READ_SIZE);
     this.attempts = 0;
 
-    log.info("started file tailer on file " + toWatch
+    log.info("started file tailer " + adaptorID +  "  on file " + toWatch
         + " with first byte at offset " + offsetOfFirstByte);
   }
  

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/connector/http/HttpConnector.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/connector/http/HttpConnector.java?rev=889831&r1=889830&r2=889831&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/connector/http/HttpConnector.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/connector/http/HttpConnector.java Fri Dec 11 21:58:01 2009
@@ -170,7 +170,6 @@
         int toSend = newQueue.size();
         List<ChukwaHttpSender.CommitListEntry> results = connectorClient
             .send(newQueue);
-        log.info("sent " + toSend + " chunks, got back " + results.size() + " acks");
         // checkpoint the chunks which were committed
         for (ChukwaHttpSender.CommitListEntry cle : results) {
           agent.reportCommit(cle.adaptor, cle.uuid);

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java?rev=889831&r1=889830&r2=889831&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java Fri Dec 11 21:58:01 2009
@@ -54,6 +54,8 @@
  * The Connector is responsible for picking what to send and to whom;
  * retry policy is encoded in the collectors iterator.
  * 
+ * This class is not thread safe. Synchronization is the caller's responsibility.
+ * 
  * <p>
  * On error, tries the list of available collectors, pauses for a minute, and
  * then repeats.
@@ -77,6 +79,7 @@
   static HttpClient client = null;
   static MultiThreadedHttpConnectionManager connectionManager = null;
   String currCollector = null;
+  int postID = 0;
 
   protected Iterator<String> collectors;
 
@@ -169,7 +172,8 @@
     List<DataOutputBuffer> serializedEvents = new ArrayList<DataOutputBuffer>();
     List<CommitListEntry> commitResults = new ArrayList<CommitListEntry>();
 
-    log.info("collected " + toSend.size() + " chunks");
+    int thisPost = postID++;
+    log.info("collected " + toSend.size() + " chunks for post_"+thisPost);
 
     // Serialize each chunk in turn into it's own DataOutputBuffer and add that
     // buffer to serializedEvents
@@ -194,9 +198,11 @@
 
     PostMethod method = new PostMethod();
     method.setRequestEntity(postData);
-    log.info(">>>>>> HTTP post to " + currCollector + " length = " + postData.getContentLength());
+    log.info(">>>>>> HTTP post_"+thisPost + " to " + currCollector + " length = " + postData.getContentLength());
 
-    return postAndParseResponse(method, commitResults);
+    List<CommitListEntry> results =  postAndParseResponse(method, commitResults);
+    log.info("post_" + thisPost + " sent " + toSend.size() + " chunks, got back " + results.size() + " acks");
+    return results;
   }
   
   /**
@@ -256,7 +262,7 @@
             Thread.sleep(WAIT_FOR_COLLECTOR_REBOOT);
             retries--;
           } else {
-            log.error("No more collectors to try rolling over to; aborting");
+            log.error("No more collectors to try rolling over to; aborting post");
             throw new IOException("no collectors");
           }
         }