You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by as...@apache.org on 2010/03/23 19:33:12 UTC
svn commit: r926711 - in /hadoop/chukwa/branches/chukwa-0.4: ./
src/java/org/apache/hadoop/chukwa/datacollection/collector/
src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/
src/java/org/apache/hadoop/chukwa/datacollection/writer/
Author: asrabkin
Date: Tue Mar 23 18:33:12 2010
New Revision: 926711
URL: http://svn.apache.org/viewvc?rev=926711&view=rev
Log:
CHUKWA-445. Realtime display at collector.
Added:
hadoop/chukwa/branches/chukwa-0.4/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/LogDisplayServlet.java
hadoop/chukwa/branches/chukwa-0.4/src/java/org/apache/hadoop/chukwa/datacollection/writer/ExtractorWriter.java
Modified:
hadoop/chukwa/branches/chukwa-0.4/CHANGES.txt
hadoop/chukwa/branches/chukwa-0.4/src/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java
Modified: hadoop/chukwa/branches/chukwa-0.4/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/chukwa/branches/chukwa-0.4/CHANGES.txt?rev=926711&r1=926710&r2=926711&view=diff
==============================================================================
--- hadoop/chukwa/branches/chukwa-0.4/CHANGES.txt (original)
+++ hadoop/chukwa/branches/chukwa-0.4/CHANGES.txt Tue Mar 23 18:33:12 2010
@@ -4,6 +4,8 @@ Trunk (unreleased changes)
NEW FEATURES
+ CHUKWA-445. Realtime display at collector. (asrabkin)
+
CHUKWA-454. DirTailingAdaptor can filter files. (Gerrit Jansen van Vuuren via asrabkin)
CHUKWA-449. Utility to generate sequence file from log file. (Bill Graham via asrabkin)
Modified: hadoop/chukwa/branches/chukwa-0.4/src/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/branches/chukwa-0.4/src/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java?rev=926711&r1=926710&r2=926711&view=diff
==============================================================================
--- hadoop/chukwa/branches/chukwa-0.4/src/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java (original)
+++ hadoop/chukwa/branches/chukwa-0.4/src/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java Tue Mar 23 18:33:12 2010
@@ -22,8 +22,7 @@ package org.apache.hadoop.chukwa.datacol
import org.mortbay.jetty.*;
import org.mortbay.jetty.nio.SelectChannelConnector;
import org.mortbay.jetty.servlet.*;
-import org.apache.hadoop.chukwa.datacollection.collector.servlet.CommitCheckServlet;
-import org.apache.hadoop.chukwa.datacollection.collector.servlet.ServletCollector;
+import org.apache.hadoop.chukwa.datacollection.collector.servlet.*;
import org.apache.hadoop.chukwa.datacollection.connector.http.HttpConnector;
import org.apache.hadoop.chukwa.datacollection.writer.*;
import org.apache.hadoop.chukwa.util.DaemonWatcher;
@@ -107,6 +106,10 @@ public class CollectorStub {
if(conf.getBoolean(HttpConnector.ASYNC_ACKS_OPT, false))
root.addServlet(new ServletHolder(new CommitCheckServlet(conf)), "/"+CommitCheckServlet.DEFAULT_PATH);
+ if(conf.getBoolean(LogDisplayServlet.ENABLED_OPT, false))
+ root.addServlet(new ServletHolder(new LogDisplayServlet(conf)), "/"+LogDisplayServlet.DEFAULT_PATH);
+
+
root.setAllowNullPathInfo(false);
// Add in any user-specified servlets
Added: hadoop/chukwa/branches/chukwa-0.4/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/LogDisplayServlet.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/branches/chukwa-0.4/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/LogDisplayServlet.java?rev=926711&view=auto
==============================================================================
--- hadoop/chukwa/branches/chukwa-0.4/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/LogDisplayServlet.java (added)
+++ hadoop/chukwa/branches/chukwa-0.4/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/LogDisplayServlet.java Tue Mar 23 18:33:12 2010
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.datacollection.collector.servlet;
+
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import org.apache.log4j.Logger;
+import java.io.*;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.*;
+import org.apache.hadoop.chukwa.*;
+import org.apache.hadoop.chukwa.datacollection.writer.ExtractorWriter;
+import org.apache.hadoop.conf.Configuration;
+
+public class LogDisplayServlet extends HttpServlet {
+
+ /*
+ static class StreamName {
+ byte[] md5;
+ public StreamName(Chunk c) {
+
+ }
+ @Override
+ public int hashCode() {
+ int x=0;
+ for(int i=0; i< md5.length; ++i) {
+ x ^= (md5[i] << 4 * i);
+ }
+ return x;
+ }
+
+ public boolean equals(Object x) {
+ if(x instanceof StreamName)
+ return Arrays.equals(md5, ((StreamName)x).md5);
+ else return false;
+ }
+ }*/
+
+ public static final String DEFAULT_PATH = "logs";
+ public static final String ENABLED_OPT = "chukwaCollector.showLogs.enabled";
+ public static final String BUF_SIZE_OPT = "chukwaCollector.showLogs.buffer";
+ long BUF_SIZE = 1024* 1024;
+
+ Configuration conf;
+ Map<String, Deque<Chunk>> chunksBySID = new HashMap<String, Deque<Chunk>>();
+ Queue<String> receivedSIDs = new LinkedList<String>();
+ long totalStoredSize = 0;
+
+ private static final long serialVersionUID = -4602082382919009285L;
+ protected static Logger log = Logger.getLogger(LogDisplayServlet.class);
+
+ public LogDisplayServlet() {
+ conf = new Configuration();
+ ExtractorWriter.recipient = this;
+ }
+
+ public LogDisplayServlet(Configuration c) {
+ conf = c;
+ ExtractorWriter.recipient = this;
+ }
+
+ public void init(ServletConfig servletConf) throws ServletException {
+ BUF_SIZE = conf.getLong(BUF_SIZE_OPT, BUF_SIZE);
+ }
+
+ private String getSID(Chunk c) {
+ try {
+ MessageDigest md;
+ md = MessageDigest.getInstance("MD5");
+
+ md.update(c.getSource().getBytes());
+ md.update(c.getStreamName().getBytes());
+ md.update(c.getTags().getBytes());
+ StringBuilder sb = new StringBuilder();
+ byte[] bytes = md.digest();
+ for(int i=0; i < bytes.length; ++i) {
+ if( (bytes[i] & 0xF0) == 0)
+ sb.append('0');
+ sb.append( Integer.toHexString(0xFF & bytes[i]) );
+ }
+ return sb.toString();
+ } catch(NoSuchAlgorithmException n) {
+ log.fatal(n);
+ System.exit(0);
+ return null;
+ }
+ }
+
+
+ private void pruneOldEntries() {
+ while(totalStoredSize > BUF_SIZE) {
+ String queueToPrune = receivedSIDs.remove();
+ Deque<Chunk> stream = chunksBySID.get(queueToPrune);
+ assert !stream.isEmpty() : " expected a chunk in stream with ID " + queueToPrune;
+ Chunk c = stream.poll();
+ if(c != null)
+ totalStoredSize -= c.getData().length;
+ if(stream.isEmpty()) { //remove empty deques and their names.
+ chunksBySID.remove(queueToPrune);
+ }
+ }
+ }
+
+ public synchronized void add(List<Chunk> chunks) {
+ for(Chunk c : chunks) {
+ String sid = getSID(c);
+ Deque<Chunk> stream = chunksBySID.get(sid);
+ if(stream == null) {
+ stream = new LinkedList<Chunk>();
+ chunksBySID.put(sid, stream);
+ }
+ stream.add(c);
+ receivedSIDs.add(sid);
+ totalStoredSize += c.getData().length;
+ }
+ pruneOldEntries();
+ }
+
+
+ @Override
+ protected synchronized void doGet(HttpServletRequest req, HttpServletResponse resp)
+ throws ServletException, IOException {
+
+ PrintStream out = new PrintStream(new BufferedOutputStream(resp.getOutputStream()));
+ resp.setStatus(200);
+ String path = req.getServletPath();
+ String streamID = req.getParameter("sid");
+ if (streamID != null) {
+ try {
+ Deque<Chunk> chunks = chunksBySID.get(streamID);
+ if(chunks != null) {
+ String streamName = getFriendlyName(chunks.peek());
+ out.println("<html><title>Chukwa:Received Data</title><body><h2>Data from "+ streamName + "</h2>");
+ out.println("<pre>");
+ for(Chunk c: chunks) {
+ out.write(c.getData());
+ }
+ out.println("</pre><hr><a href=\""+path+"\">Back to list of streams</a>");
+ } else
+ out.println("No data");
+ } catch(Exception e) {
+ out.println("<html><body>No data</body></html>");
+ }
+ out.println("</body></html>");
+ } else {
+ out.println("<html><title>Chukwa:Received Data</title><body><h2>Recently-seen streams</h2><ul>");
+ for(Map.Entry<String, Deque<Chunk>> sid: chunksBySID.entrySet())
+ out.println("<li> <a href=\"" + path + "?sid="+sid.getKey() + "\">"+ getFriendlyName(sid.getValue().peek()) + "</a></li>");
+ out.println("</ul></body></html>");
+ }
+ out.flush();
+ }
+
+ private String getFriendlyName(Chunk chunk) {
+ if(chunk != null)
+ return chunk.getTags() + "/" + chunk.getSource() + "/" + chunk.getStreamName();
+ else return "null";
+ }
+
+
+}
Added: hadoop/chukwa/branches/chukwa-0.4/src/java/org/apache/hadoop/chukwa/datacollection/writer/ExtractorWriter.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/branches/chukwa-0.4/src/java/org/apache/hadoop/chukwa/datacollection/writer/ExtractorWriter.java?rev=926711&view=auto
==============================================================================
--- hadoop/chukwa/branches/chukwa-0.4/src/java/org/apache/hadoop/chukwa/datacollection/writer/ExtractorWriter.java (added)
+++ hadoop/chukwa/branches/chukwa-0.4/src/java/org/apache/hadoop/chukwa/datacollection/writer/ExtractorWriter.java Tue Mar 23 18:33:12 2010
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.datacollection.writer;
+
+import java.util.List;
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.datacollection.collector.servlet.LogDisplayServlet;
+import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter.CommitStatus;
+import org.apache.hadoop.conf.Configuration;
+
+public class ExtractorWriter extends PipelineableWriter {
+
+ public static LogDisplayServlet recipient;
+
+ @Override
+ public void close() throws WriterException {
+ next.close();
+ }
+
+ @Override
+ public void init(Configuration c) throws WriterException {
+ }
+
+ public CommitStatus add(List<Chunk> chunks) throws WriterException {
+ if(recipient != null)
+ recipient.add(chunks);
+ if (next != null)
+ return next.add(chunks); //pass data through
+ else
+ return ChukwaWriter.COMMIT_OK;
+ }
+
+}