You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by as...@apache.org on 2010/03/23 19:33:12 UTC

svn commit: r926711 - in /hadoop/chukwa/branches/chukwa-0.4: ./ src/java/org/apache/hadoop/chukwa/datacollection/collector/ src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ src/java/org/apache/hadoop/chukwa/datacollection/writer/

Author: asrabkin
Date: Tue Mar 23 18:33:12 2010
New Revision: 926711

URL: http://svn.apache.org/viewvc?rev=926711&view=rev
Log:
CHUKWA-445. Realtime display at collector.

Added:
    hadoop/chukwa/branches/chukwa-0.4/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/LogDisplayServlet.java
    hadoop/chukwa/branches/chukwa-0.4/src/java/org/apache/hadoop/chukwa/datacollection/writer/ExtractorWriter.java
Modified:
    hadoop/chukwa/branches/chukwa-0.4/CHANGES.txt
    hadoop/chukwa/branches/chukwa-0.4/src/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java

Modified: hadoop/chukwa/branches/chukwa-0.4/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/chukwa/branches/chukwa-0.4/CHANGES.txt?rev=926711&r1=926710&r2=926711&view=diff
==============================================================================
--- hadoop/chukwa/branches/chukwa-0.4/CHANGES.txt (original)
+++ hadoop/chukwa/branches/chukwa-0.4/CHANGES.txt Tue Mar 23 18:33:12 2010
@@ -4,6 +4,8 @@ Trunk (unreleased changes)
 
   NEW FEATURES
 
+    CHUKWA-445. Realtime display at collector. (asrabkin)
+
     CHUKWA-454. DirTailingAdaptor can filter files. (Gerrit Jansen van Vuuren via asrabkin)
 
     CHUKWA-449. Utility to generate sequence file from log file. (Bill Graham via asrabkin)

Modified: hadoop/chukwa/branches/chukwa-0.4/src/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/branches/chukwa-0.4/src/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java?rev=926711&r1=926710&r2=926711&view=diff
==============================================================================
--- hadoop/chukwa/branches/chukwa-0.4/src/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java (original)
+++ hadoop/chukwa/branches/chukwa-0.4/src/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java Tue Mar 23 18:33:12 2010
@@ -22,8 +22,7 @@ package org.apache.hadoop.chukwa.datacol
 import org.mortbay.jetty.*;
 import org.mortbay.jetty.nio.SelectChannelConnector;
 import org.mortbay.jetty.servlet.*;
-import org.apache.hadoop.chukwa.datacollection.collector.servlet.CommitCheckServlet;
-import org.apache.hadoop.chukwa.datacollection.collector.servlet.ServletCollector;
+import org.apache.hadoop.chukwa.datacollection.collector.servlet.*;
 import org.apache.hadoop.chukwa.datacollection.connector.http.HttpConnector;
 import org.apache.hadoop.chukwa.datacollection.writer.*;
 import org.apache.hadoop.chukwa.util.DaemonWatcher;
@@ -107,6 +106,10 @@ public class CollectorStub {
       if(conf.getBoolean(HttpConnector.ASYNC_ACKS_OPT, false))
         root.addServlet(new ServletHolder(new CommitCheckServlet(conf)), "/"+CommitCheckServlet.DEFAULT_PATH);
 
+      if(conf.getBoolean(LogDisplayServlet.ENABLED_OPT, false))
+        root.addServlet(new ServletHolder(new LogDisplayServlet(conf)), "/"+LogDisplayServlet.DEFAULT_PATH);
+
+      
       root.setAllowNullPathInfo(false);
 
       // Add in any user-specified servlets

Added: hadoop/chukwa/branches/chukwa-0.4/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/LogDisplayServlet.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/branches/chukwa-0.4/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/LogDisplayServlet.java?rev=926711&view=auto
==============================================================================
--- hadoop/chukwa/branches/chukwa-0.4/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/LogDisplayServlet.java (added)
+++ hadoop/chukwa/branches/chukwa-0.4/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/LogDisplayServlet.java Tue Mar 23 18:33:12 2010
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.datacollection.collector.servlet;
+
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import org.apache.log4j.Logger;
+import java.io.*;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.*;
+import org.apache.hadoop.chukwa.*;
+import org.apache.hadoop.chukwa.datacollection.writer.ExtractorWriter;
+import org.apache.hadoop.conf.Configuration;
+
+public class LogDisplayServlet extends HttpServlet {
+  
+  /*
+  static class StreamName {
+    byte[] md5;
+    public StreamName(Chunk c) {
+  
+    }
+    @Override
+    public int hashCode() {
+      int x=0;
+      for(int i=0; i< md5.length; ++i) {
+        x ^= (md5[i] << 4 * i);
+      }
+      return x;
+    }
+    
+    public boolean equals(Object x) {
+      if(x instanceof StreamName)
+        return Arrays.equals(md5, ((StreamName)x).md5);
+      else return false;
+    }
+  }*/
+  
+  public static final String DEFAULT_PATH = "logs";
+  public static final String ENABLED_OPT = "chukwaCollector.showLogs.enabled";
+  public static final String BUF_SIZE_OPT = "chukwaCollector.showLogs.buffer";
+  long BUF_SIZE = 1024* 1024;
+  
+  Configuration conf;
+  Map<String, Deque<Chunk>> chunksBySID = new HashMap<String, Deque<Chunk>>();
+  Queue<String> receivedSIDs = new LinkedList<String>();
+  long totalStoredSize = 0;
+
+  private static final long serialVersionUID = -4602082382919009285L;
+  protected static Logger log = Logger.getLogger(LogDisplayServlet.class);
+  
+  public LogDisplayServlet() {
+    conf = new Configuration();
+    ExtractorWriter.recipient = this;
+  }
+  
+  public LogDisplayServlet(Configuration c) {
+    conf = c;
+    ExtractorWriter.recipient = this;
+  }
+
+  public void init(ServletConfig servletConf) throws ServletException {
+    BUF_SIZE = conf.getLong(BUF_SIZE_OPT, BUF_SIZE);
+  }
+  
+  private String getSID(Chunk c) {
+    try { 
+      MessageDigest md;
+      md = MessageDigest.getInstance("MD5");
+  
+      md.update(c.getSource().getBytes());
+      md.update(c.getStreamName().getBytes());
+      md.update(c.getTags().getBytes());
+      StringBuilder sb = new StringBuilder();
+      byte[] bytes = md.digest();
+      for(int i=0; i < bytes.length; ++i) {
+        if( (bytes[i] & 0xF0) == 0)
+          sb.append('0');
+        sb.append( Integer.toHexString(0xFF & bytes[i]) );
+      }
+      return sb.toString();
+    } catch(NoSuchAlgorithmException n) {
+      log.fatal(n);
+      System.exit(0);
+      return null;
+    }
+  }
+  
+  
+  private void pruneOldEntries() {
+    while(totalStoredSize > BUF_SIZE) {
+      String queueToPrune = receivedSIDs.remove();
+      Deque<Chunk> stream = chunksBySID.get(queueToPrune);
+      assert !stream.isEmpty() : " expected a chunk in stream with ID " + queueToPrune;
+      Chunk c = stream.poll();
+      if(c != null)
+        totalStoredSize -= c.getData().length;
+      if(stream.isEmpty()) {  //remove empty deques and their names.
+        chunksBySID.remove(queueToPrune);
+      }
+    }
+  }
+  
+  public synchronized void add(List<Chunk> chunks) {
+    for(Chunk c : chunks) {
+      String sid = getSID(c);
+      Deque<Chunk> stream = chunksBySID.get(sid);
+      if(stream == null) {
+        stream = new LinkedList<Chunk>();
+        chunksBySID.put(sid, stream);
+      }
+      stream.add(c);
+      receivedSIDs.add(sid);
+      totalStoredSize += c.getData().length;
+    }
+    pruneOldEntries();
+  }
+  
+
+  @Override
+  protected synchronized void doGet(HttpServletRequest req, HttpServletResponse resp)
+      throws ServletException, IOException  {
+  
+    PrintStream out = new PrintStream(new BufferedOutputStream(resp.getOutputStream()));
+    resp.setStatus(200);
+    String path = req.getServletPath();
+    String streamID = req.getParameter("sid");
+    if (streamID != null) {
+      try {
+        Deque<Chunk> chunks = chunksBySID.get(streamID);
+        if(chunks != null) {
+          String streamName = getFriendlyName(chunks.peek());
+          out.println("<html><title>Chukwa:Received Data</title><body><h2>Data from "+ streamName + "</h2>");
+          out.println("<pre>");
+          for(Chunk c: chunks) {
+            out.write(c.getData());
+          }
+          out.println("</pre><hr><a href=\""+path+"\">Back to list of streams</a>");
+        } else
+          out.println("No data");
+      } catch(Exception e) {
+        out.println("<html><body>No data</body></html>");
+      }
+      out.println("</body></html>");
+    } else {
+      out.println("<html><title>Chukwa:Received Data</title><body><h2>Recently-seen streams</h2><ul>");
+      for(Map.Entry<String, Deque<Chunk>> sid: chunksBySID.entrySet()) 
+        out.println("<li> <a href=\"" + path + "?sid="+sid.getKey() + "\">"+ getFriendlyName(sid.getValue().peek()) + "</a></li>");
+      out.println("</ul></body></html>");
+    }
+    out.flush();
+  }
+
+  private String getFriendlyName(Chunk chunk) {
+    if(chunk != null)
+      return chunk.getTags() + "/" + chunk.getSource() + "/" + chunk.getStreamName();
+    else return "null";
+  }
+
+
+}

Added: hadoop/chukwa/branches/chukwa-0.4/src/java/org/apache/hadoop/chukwa/datacollection/writer/ExtractorWriter.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/branches/chukwa-0.4/src/java/org/apache/hadoop/chukwa/datacollection/writer/ExtractorWriter.java?rev=926711&view=auto
==============================================================================
--- hadoop/chukwa/branches/chukwa-0.4/src/java/org/apache/hadoop/chukwa/datacollection/writer/ExtractorWriter.java (added)
+++ hadoop/chukwa/branches/chukwa-0.4/src/java/org/apache/hadoop/chukwa/datacollection/writer/ExtractorWriter.java Tue Mar 23 18:33:12 2010
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.datacollection.writer;
+
+import java.util.List;
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.datacollection.collector.servlet.LogDisplayServlet;
+import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter.CommitStatus;
+import org.apache.hadoop.conf.Configuration;
+
+public class ExtractorWriter extends PipelineableWriter {
+
+  public static LogDisplayServlet recipient;
+  
+  @Override
+  public void close() throws WriterException {
+    next.close();
+  }
+
+  @Override
+  public void init(Configuration c) throws WriterException {
+  }
+  
+  public CommitStatus add(List<Chunk> chunks) throws WriterException {
+    if(recipient != null)
+      recipient.add(chunks);
+    if (next != null)
+      return next.add(chunks); //pass data through
+    else
+      return ChukwaWriter.COMMIT_OK;
+  }
+
+}