You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@chukwa.apache.org by ey...@apache.org on 2015/07/26 04:09:00 UTC
[7/8] chukwa git commit: CHUKWA-771. Improved code quality issue
identified by findbugs. (Eric Yang)
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/FileAdaptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/FileAdaptor.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/FileAdaptor.java
index 14cf514..4bfae2d 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/FileAdaptor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/FileAdaptor.java
@@ -35,8 +35,6 @@ import org.apache.log4j.Logger;
class FileAdaptorTailer extends Thread {
static Logger log = Logger.getLogger(FileAdaptorTailer.class);
private List<FileAdaptor> adaptors = null;
- private static Configuration conf = null;
- private Object lock = new Object();
/**
* How often to call each adaptor.
@@ -46,16 +44,13 @@ class FileAdaptorTailer extends Thread {
public FileAdaptorTailer() {
-
- if (conf == null) {
- ChukwaAgent agent = ChukwaAgent.getAgent();
- if (agent != null) {
- conf = agent.getConfiguration();
- if (conf != null) {
- SAMPLE_PERIOD_MS = conf.getInt(
- "chukwaAgent.adaptor.context.switch.time",
- DEFAULT_SAMPLE_PERIOD_MS);
- }
+ ChukwaAgent agent = ChukwaAgent.getAgent();
+ if (agent != null) {
+ Configuration conf = agent.getConfiguration();
+ if (conf != null) {
+ SAMPLE_PERIOD_MS = conf.getInt(
+ "chukwaAgent.adaptor.context.switch.time",
+ DEFAULT_SAMPLE_PERIOD_MS);
}
}
@@ -70,17 +65,6 @@ class FileAdaptorTailer extends Thread {
while(true) {
try {
- while (adaptors.size() == 0) {
- synchronized (lock) {
- try {
- log.info("Waiting queue is empty");
- lock.wait();
- } catch (InterruptedException e) {
- // do nothing
- }
- }
- }
-
long startTime = System.currentTimeMillis();
for (FileAdaptor adaptor: adaptors) {
log.info("calling sendFile for " + adaptor.toWatch.getCanonicalPath());
@@ -100,9 +84,6 @@ class FileAdaptorTailer extends Thread {
public void addFileAdaptor(FileAdaptor adaptor) {
adaptors.add(adaptor);
- synchronized (lock) {
- lock.notifyAll();
- }
}
public void removeFileAdaptor(FileAdaptor adaptor) {
@@ -119,7 +100,7 @@ public class FileAdaptor extends AbstractAdaptor {
static FileAdaptorTailer tailer = null;
static final int DEFAULT_TIMEOUT_PERIOD = 5*60*1000;
- static int TIMEOUT_PERIOD = DEFAULT_TIMEOUT_PERIOD;
+ int TIMEOUT_PERIOD = DEFAULT_TIMEOUT_PERIOD;
static {
tailer = new FileAdaptorTailer();
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMXAdaptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMXAdaptor.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMXAdaptor.java
index c07f6fa..5f4928a 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMXAdaptor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMXAdaptor.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.chukwa.datacollection.adaptor;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
-import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.Charset;
@@ -105,9 +104,16 @@ public class JMXAdaptor extends AbstractAdaptor{
while(!shutdown){
try{
BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(jmx_pw_file.getAbsolutePath()), Charset.forName("UTF-8")));
- String[] creds = br.readLine().split(" ");
- Map<String, String[]> env = new HashMap<String, String[]>();
- env.put(JMXConnector.CREDENTIALS, creds);
+ String buffer = br.readLine();
+ String[] creds = null;
+ if(buffer != null ) {
+ creds = buffer.split(" ");
+ }
+ br.close();
+ Map<String, String[]> env = new HashMap<String, String[]>();
+ if(creds!=null) {
+ env.put(JMXConnector.CREDENTIALS, creds);
+ }
jmxc = JMXConnectorFactory.connect(url, env);
mbsc = jmxc.getMBeanServerConnection();
if(timer == null) {
@@ -131,7 +137,7 @@ public class JMXAdaptor extends AbstractAdaptor{
timer.cancel();
timer = null;
shutdown = true;
- }
+ }
}
}
@@ -181,7 +187,7 @@ public class JMXAdaptor extends AbstractAdaptor{
Descriptor d = mb.getDescriptor();
val = mbsc.getAttribute(oname, mb.getName());
if(d.getFieldNames().length > 0){ //this is an open mbean
- OpenType openType = (OpenType)d.getFieldValue("openType");
+ OpenType<?> openType = (OpenType<?>)d.getFieldValue("openType");
if(openType.isArray()){
Object[] valarray = (Object[])val;
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/OozieAdaptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/OozieAdaptor.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/OozieAdaptor.java
index 39af580..5011f70 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/OozieAdaptor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/OozieAdaptor.java
@@ -50,7 +50,7 @@ public class OozieAdaptor extends AbstractAdaptor {
private final ScheduledExecutorService scheduler = Executors
.newScheduledThreadPool(1);
private static final long initialDelay = 60; // seconds
- private static long periodicity = 60; // seconds
+ private long periodicity = 60; // seconds
private ScheduledFuture<?> scheduledCollectorThread;
@Override
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SocketAdaptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SocketAdaptor.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SocketAdaptor.java
index b37be9c..072c151 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SocketAdaptor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SocketAdaptor.java
@@ -140,16 +140,14 @@ public class SocketAdaptor extends AbstractAdaptor {
}
}
} catch(java.io.EOFException e) {
- log.info("Caught java.io.EOFException closing conneciton.");
+ log.debug("Caught java.io.EOFException:", e);
} catch(java.net.SocketException e) {
- log.info("Caught java.net.SocketException closing conneciton.");
+ log.debug("Caught java.net.SocketException:", e);
} catch(InterruptedIOException e) {
Thread.currentThread().interrupt();
- log.info("Caught java.io.InterruptedIOException: "+e);
- log.info("Closing connection.");
+ log.debug("Caught java.io.InterruptedIOException: ", e);
} catch(IOException e) {
- log.info("Caught java.io.IOException: "+e);
- log.info("Closing connection.");
+ log.debug("Caught java.io.IOException: "+e);
} catch(Exception e) {
log.error("Unexpected exception. Closing conneciton.", e);
} finally {
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SyslogAdaptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SyslogAdaptor.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SyslogAdaptor.java
index 07f6c66..50dec64 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SyslogAdaptor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SyslogAdaptor.java
@@ -76,7 +76,7 @@ public class SyslogAdaptor extends UDPAdaptor {
facility = facility / 8;
dataType = facilityMap.get(facility);
} catch (NumberFormatException nfe) {
- log.warn("Unsupported format detected by SyslogAdaptor:"+trimmedBuf);
+ log.warn("Unsupported format detected by SyslogAdaptor:"+Arrays.toString(trimmedBuf));
}
}
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/WriteaheadBuffered.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/WriteaheadBuffered.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/WriteaheadBuffered.java
index dcf0600..cb04aae 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/WriteaheadBuffered.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/WriteaheadBuffered.java
@@ -17,7 +17,6 @@
*/
package org.apache.hadoop.chukwa.datacollection.adaptor;
-import java.util.*;
import java.io.*;
import org.apache.hadoop.chukwa.Chunk;
import org.apache.hadoop.chukwa.ChunkImpl;
@@ -37,7 +36,7 @@ public class WriteaheadBuffered extends AbstractWrapper {
@Override
- public synchronized void add(Chunk event) throws InterruptedException {
+ public void add(Chunk event) throws InterruptedException {
try {
event.write(outToDisk);
outToDisk.flush();
@@ -85,7 +84,7 @@ public class WriteaheadBuffered extends AbstractWrapper {
}
@Override
- public synchronized void committed(long l) {
+ public void committed(long l) {
try {
long bytesOutstanding = highestSentOffset - l;
@@ -93,7 +92,10 @@ public class WriteaheadBuffered extends AbstractWrapper {
fSize = 0;
outToDisk.close();
File outBufTmp = new File(outBuf.getAbsoluteFile(), outBuf.getName() + ".tmp");
- outBuf.renameTo(outBufTmp);
+ if(!outBuf.renameTo(outBufTmp)) {
+ log.warn("Cannot rename temp file "+outBuf.getAbsolutePath()+
+ " to "+outBufTmp.getAbsolutePath());
+ };
outToDisk = new DataOutputStream(new FileOutputStream(outBuf, false));
DataInputStream dis = new DataInputStream(new FileInputStream(outBufTmp));
while(dis.available() > 0) {
@@ -104,7 +106,9 @@ public class WriteaheadBuffered extends AbstractWrapper {
}
}
dis.close();
- outBufTmp.delete();
+ if(!outBufTmp.delete()) {
+ log.warn("Can not delete temp file: "+outBufTmp.getAbsolutePath());
+ };
}
} catch(IOException e) {
log.error(e);
@@ -114,8 +118,11 @@ public class WriteaheadBuffered extends AbstractWrapper {
@Override
public long shutdown(AdaptorShutdownPolicy p) throws AdaptorException {
- if(p != RESTARTING)
- outBuf.delete();
+ if(p != RESTARTING) {
+ if(outBuf.delete()) {
+ log.warn("Cannot delete output buffer file:"+outBuf.getAbsolutePath());
+ };
+ }
return inner.shutdown(p);
}
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java
index 5fea073..9fc25b9 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java
@@ -22,7 +22,6 @@ import java.io.IOException;
import java.io.RandomAccessFile;
import java.io.File;
import org.apache.hadoop.chukwa.datacollection.adaptor.*;
-import org.apache.hadoop.chukwa.util.ExceptionUtil;
/**
* An adaptor that repeatedly tails a specified file, sending the new bytes.
@@ -118,7 +117,7 @@ public class FileTailingAdaptor extends LWFTAdaptor {
* @param eq the queue to write Chunks into
*/
@Override
- public synchronized boolean tailFile()
+ public boolean tailFile()
throws InterruptedException {
boolean hasMoreData = false;
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java
index 9da09d5..dc867d5 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java
@@ -51,7 +51,7 @@ public class LWFTAdaptor extends AbstractAdaptor {
public static final String MAX_READ_SIZE_OPT =
"chukwaAgent.fileTailingAdaptor.maxReadSize";
- static int MAX_READ_SIZE = DEFAULT_MAX_READ_SIZE;
+ int MAX_READ_SIZE = DEFAULT_MAX_READ_SIZE;
static Logger log;
static FileTailer tailer;
@@ -200,7 +200,7 @@ public class LWFTAdaptor extends AbstractAdaptor {
return hasMoreData;
}
- public synchronized boolean tailFile()
+ public boolean tailFile()
throws InterruptedException {
boolean hasMoreData = false;
try {
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/RCheckFTAdaptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/RCheckFTAdaptor.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/RCheckFTAdaptor.java
index 2fa82fe..cd8d53f 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/RCheckFTAdaptor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/RCheckFTAdaptor.java
@@ -26,6 +26,8 @@ import java.util.regex.Pattern;
import java.util.Collections;
import java.util.LinkedList;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
/**
* Checkpoint state:
* date modified of most-recently tailed file, offset of first byte of that file,
@@ -54,6 +56,22 @@ public class RCheckFTAdaptor extends LWFTAdaptor implements FileFilter {
//just a heuristic that hasn't been tuned yet
else return (o.f.getName().compareTo(f.getName()));
}
+
+ @Override
+ public boolean equals(Object o) {
+ if(o instanceof FPair) {
+ return mod == ((FPair) o).mod;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder(643, 1321).
+ append(this.mod).
+ toHashCode();
+ }
}
long prevFileLastModDate = 0;
@@ -129,7 +147,7 @@ public class RCheckFTAdaptor extends LWFTAdaptor implements FileFilter {
}
@Override
- public synchronized boolean tailFile()
+ public boolean tailFile()
throws InterruptedException {
boolean hasMoreData = false;
try {
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/heartbeat/HttpStatusChecker.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/heartbeat/HttpStatusChecker.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/heartbeat/HttpStatusChecker.java
index 082dd58..e924172 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/heartbeat/HttpStatusChecker.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/heartbeat/HttpStatusChecker.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.chukwa.datacollection.adaptor.heartbeat;
+import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
@@ -54,7 +55,7 @@ public class HttpStatusChecker implements StatusChecker {
connection = (HttpURLConnection)url.openConnection();
connection.connect();
status.put("status", "running");
- } catch (Exception e) {
+ } catch (IOException e) {
status.put("status", "stopped");
} finally {
if(connection != null){
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSAdaptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSAdaptor.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSAdaptor.java
index 991cdaf..79f8db6 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSAdaptor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSAdaptor.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.chukwa.datacollection.adaptor.jms;
+import java.nio.charset.Charset;
+
import org.apache.hadoop.chukwa.datacollection.adaptor.AbstractAdaptor;
import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException;
import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorShutdownPolicy;
@@ -100,7 +102,7 @@ public class JMSAdaptor extends AbstractAdaptor {
bytesReceived += bytes.length;
if (log.isDebugEnabled()) {
- log.debug("Adding Chunk from JMS message: " + new String(bytes));
+ log.debug("Adding Chunk from JMS message: " + new String(bytes, Charset.forName("UTF-8")));
}
Chunk c = new ChunkImpl(type, source, bytesReceived, bytes, JMSAdaptor.this);
@@ -142,6 +144,7 @@ public class JMSAdaptor extends AbstractAdaptor {
String transformerName = null;
String transformerConfs = null;
+ StringBuilder transformerConfsBuffer = new StringBuilder();
for (int i = 1; i < tokens.length; i++) {
String value = tokens[i];
if ("-t".equals(value)) {
@@ -168,17 +171,19 @@ public class JMSAdaptor extends AbstractAdaptor {
transformerName = tokens[++i];
}
else if ("-p".equals(value)) {
- transformerConfs = tokens[++i];
+ transformerConfsBuffer.append(tokens[++i]);
+ transformerConfs = transformerConfsBuffer.toString();
// transformerConfs can have multiple words
- if (transformerConfs.startsWith("\"")) {
+ if (transformerConfsBuffer.toString().startsWith("\"")) {
for(int j = i + 1; j < tokens.length; j++) {
- transformerConfs = transformerConfs + " " + tokens[++i];
+ transformerConfsBuffer.append(" ");
+ transformerConfsBuffer.append(tokens[++i]);
if(tokens[j].endsWith("\"")) {
break;
}
}
- transformerConfs = trimQuotes(transformerConfs);
+ transformerConfs = trimQuotes(transformerConfsBuffer.toString());
}
}
}
@@ -196,7 +201,7 @@ public class JMSAdaptor extends AbstractAdaptor {
// create transformer
if (transformerName != null) {
try {
- Class classDefinition = Class.forName(transformerName);
+ Class<?> classDefinition = Class.forName(transformerName);
Object object = classDefinition.newInstance();
transformer = (JMSMessageTransformer)object;
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSMessagePropertyTransformer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSMessagePropertyTransformer.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSMessagePropertyTransformer.java
index b0ef917..facff2d 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSMessagePropertyTransformer.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSMessagePropertyTransformer.java
@@ -73,14 +73,16 @@ public class JMSMessagePropertyTransformer implements JMSMessageTransformer {
String token = tokens[i];
if ("-d".equals(token) && i <= tokens.length - 2) {
- String value = tokens[++i];
+ StringBuilder value = new StringBuilder();
+ value.append(tokens[++i]);
// we lost all spaces with the split, so we have to put them back, yuck.
while (i <= tokens.length - 2 && !tokens[i + 1].startsWith("-")) {
- value = value + " " + tokens[++i];
+ value.append(" ");
+ value.append(tokens[++i]);
}
- delimiter = trimSingleQuotes(value);
+ delimiter = trimSingleQuotes(value.toString());
}
else if ("-r".equals(token) && i <= tokens.length - 2) {
// requiredPropertyNames = null means all are required.
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSTextMessageTransformer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSTextMessageTransformer.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSTextMessageTransformer.java
index 9f98f4a..52d4cb8 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSTextMessageTransformer.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSTextMessageTransformer.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.chukwa.datacollection.adaptor.jms;
+import java.nio.charset.Charset;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -44,7 +46,7 @@ public class JMSTextMessageTransformer implements JMSMessageTransformer {
String text = ((TextMessage)message).getText();
if (text != null && text.length() > 0) {
- return text.getBytes();
+ return text.getBytes(Charset.forName("UTF-8"));
}
return null;
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorResetThread.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorResetThread.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorResetThread.java
index cde2868..88ba9bc 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorResetThread.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorResetThread.java
@@ -21,7 +21,6 @@ import java.util.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorShutdownPolicy;
-import org.apache.hadoop.chukwa.datacollection.collector.servlet.CommitCheckServlet;
import org.apache.hadoop.chukwa.datacollection.sender.AsyncAckSender;
import org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter;
import org.apache.log4j.Logger;
@@ -52,9 +51,8 @@ public class AdaptorResetThread extends Thread {
public AdaptorResetThread(Configuration conf, ChukwaAgent a) {
//
- timeout = conf.getInt(SeqFileWriter.ROTATE_INTERVAL_OPT, timeout/3)
- + conf.getInt(AsyncAckSender.POLLPERIOD_OPT, timeout/3)
- + conf.getInt(CommitCheckServlet.SCANPERIOD_OPT, timeout/3);
+ timeout = conf.getInt(SeqFileWriter.ROTATE_INTERVAL_OPT, timeout/2)
+ + conf.getInt(AsyncAckSender.POLLPERIOD_OPT, timeout/2);
timeout = conf.getInt(TIMEOUT_OPT, timeout); //unless overridden
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java
index dda7888..d024180 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java
@@ -74,7 +74,7 @@ public class AgentControlSocketListener extends Thread {
InputStream in = connection.getInputStream();
BufferedReader br = new BufferedReader(new InputStreamReader(in, Charset.forName("UTF-8")));
PrintStream out = new PrintStream(new BufferedOutputStream(connection
- .getOutputStream()));
+ .getOutputStream()), true, "UTF-8");
String cmd = null;
while ((cmd = br.readLine()) != null) {
processCommand(cmd, out);
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/rest/AdaptorConfig.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/rest/AdaptorConfig.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/rest/AdaptorConfig.java
index 78c307e..03ed635 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/rest/AdaptorConfig.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/rest/AdaptorConfig.java
@@ -17,7 +17,6 @@
*/
package org.apache.hadoop.chukwa.datacollection.agent.rest;
-import javax.xml.bind.annotation.XmlAttribute;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
import javax.xml.bind.annotation.XmlAccessorType;
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/rest/AdaptorController.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/rest/AdaptorController.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/rest/AdaptorController.java
index 70edc2a..dc44975 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/rest/AdaptorController.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/rest/AdaptorController.java
@@ -21,9 +21,6 @@ import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException;
import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
import org.apache.hadoop.chukwa.datacollection.OffsetStatsManager;
-import org.apache.hadoop.chukwa.util.ExceptionUtil;
-import org.apache.commons.lang.StringEscapeUtils;
-import org.json.simple.JSONObject;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -31,15 +28,11 @@ import javax.ws.rs.Path;
import javax.ws.rs.GET;
import javax.ws.rs.Produces;
import javax.ws.rs.PathParam;
-import javax.ws.rs.QueryParam;
import javax.ws.rs.DELETE;
import javax.ws.rs.POST;
import javax.ws.rs.Consumes;
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.MediaType;
-import javax.servlet.ServletContext;
import javax.servlet.http.HttpServletResponse;
import java.text.DecimalFormat;
@@ -54,7 +47,7 @@ import java.util.Map;
public class AdaptorController {
private static final DecimalFormat DECIMAL_FORMAT = new DecimalFormat();
- private static final Log log = LogFactory.getLog(AdaptorController.class);
+ private static final Log LOG = LogFactory.getLog(AdaptorController.class);
static {
DECIMAL_FORMAT.setMinimumFractionDigits(2);
@@ -95,6 +88,8 @@ public class AdaptorController {
String adaptorId = agent.processAddCommandE(addCommand.toString());
return doGetAdaptor(adaptorId);
} catch (AdaptorException e) {
+ LOG.warn("Could not add adaptor for data type: '" + ac.getDataType() +
+ "', error: " + e.getMessage());
return badRequestResponse("Could not add adaptor for data type: '" + ac.getDataType() +
"', error: " + e.getMessage());
}
@@ -180,7 +175,7 @@ public class AdaptorController {
protected AdaptorInfo buildAdaptor(String adaptorId) {
ChukwaAgent agent = ChukwaAgent.getAgent();
Adaptor adaptor = agent.getAdaptor(adaptorId);
- OffsetStatsManager adaptorStats = agent.getAdaptorStatsManager();
+ OffsetStatsManager<Adaptor> adaptorStats = agent.getAdaptorStatsManager();
AdaptorInfo info = new AdaptorInfo();
info.setId(adaptorId);
@@ -205,7 +200,7 @@ public class AdaptorController {
AdaptorList list = new AdaptorList();
for(String name : adaptorMap.keySet()) {
Adaptor adaptor = agent.getAdaptor(name);
- OffsetStatsManager adaptorStats = agent.getAdaptorStatsManager();
+ OffsetStatsManager<Adaptor> adaptorStats = agent.getAdaptorStatsManager();
AdaptorInfo info = new AdaptorInfo();
info.setId(name);
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/CommitCheckServlet.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/CommitCheckServlet.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/CommitCheckServlet.java
index d4c2df4..df1616b 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/CommitCheckServlet.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/CommitCheckServlet.java
@@ -20,13 +20,18 @@ package org.apache.hadoop.chukwa.datacollection.collector.servlet;
import java.io.IOException;
import java.io.PrintStream;
import java.net.URI;
+
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+
import org.apache.log4j.Logger;
+
import java.util.*;
+
+import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter;
import org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT;
import org.apache.hadoop.chukwa.extraction.archive.SinkArchiver;
@@ -39,8 +44,8 @@ public class CommitCheckServlet extends HttpServlet {
private static final long serialVersionUID = -4627538252371890849L;
protected final static Logger log = Logger.getLogger(CommitCheckServlet.class);
- CommitCheckThread commitCheck;
- Configuration conf;
+ transient CommitCheckThread commitCheck;
+ transient Configuration conf;
//interval at which to scan the filesystem, ms
public static final String SCANPERIOD_OPT = "chukwaCollector.asyncAcks.scanperiod";
@@ -78,7 +83,7 @@ public class CommitCheckServlet extends HttpServlet {
protected void doGet(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {
- PrintStream out = new PrintStream(resp.getOutputStream());
+ PrintStream out = new PrintStream(resp.getOutputStream(), true, "UTF-8");
resp.setStatus(200);
out.println("<html><body><h2>Commit status</h2><ul>");
@@ -98,6 +103,7 @@ public class CommitCheckServlet extends HttpServlet {
* For now, instead, we'll just do an ls in a bunch of places.
*/
private static class CommitCheckThread extends Thread implements CHUKWA_CONSTANT {
+
int checkInterval = 1000 * 30;
volatile boolean running = true;
final Collection<Path> pathsToSearch;
@@ -116,15 +122,30 @@ public class CommitCheckServlet extends HttpServlet {
this.purgeTime = time;
this.len = len;
}
-
+
+ @Override
+ public boolean equals (Object o) {
+ if(o == null || !(o instanceof PurgeTask)) {
+ return false;
+ }
+ PurgeTask other = (PurgeTask) o;
+ return this.hashCode() == other.hashCode();
+ }
+
+ @Override
public int compareTo(PurgeTask p) {
if(purgeTime < p.purgeTime)
return -1;
- else if (purgeTime == p.purgeTime)
+ else if (this.equals(p))
return 0;
else
return 1;
}
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder(3221, 4271).append(purgeTime).toHashCode();
+ }
}
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/LogDisplayServlet.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/LogDisplayServlet.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/LogDisplayServlet.java
index 613fa3e..dfbe53a 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/LogDisplayServlet.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/LogDisplayServlet.java
@@ -22,11 +22,15 @@ import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+
import org.apache.log4j.Logger;
+
import java.io.*;
+import java.nio.charset.Charset;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.*;
+
import org.apache.hadoop.chukwa.*;
import org.apache.hadoop.chukwa.datacollection.writer.ExtractorWriter;
import org.apache.hadoop.conf.Configuration;
@@ -61,8 +65,8 @@ public class LogDisplayServlet extends HttpServlet {
public static final String BUF_SIZE_OPT = "chukwaCollector.showLogs.buffer";
long BUF_SIZE = 1024* 1024;
- Configuration conf;
- Map<String, Deque<Chunk>> chunksBySID = new HashMap<String, Deque<Chunk>>();
+ transient Configuration conf;
+ transient Map<String, Deque<Chunk>> chunksBySID;
Queue<String> receivedSIDs = new LinkedList<String>();
long totalStoredSize = 0;
@@ -71,12 +75,20 @@ public class LogDisplayServlet extends HttpServlet {
public LogDisplayServlet() {
conf = new Configuration();
- ExtractorWriter.recipient = this;
+ chunksBySID = new HashMap<String, Deque<Chunk>>();
+ ExtractorWriter.setRecipient(this);
}
public LogDisplayServlet(Configuration c) {
conf = c;
- ExtractorWriter.recipient = this;
+ chunksBySID = new HashMap<String, Deque<Chunk>>();
+ ExtractorWriter.setRecipient(this);
+ }
+
+ public LogDisplayServlet(Configuration c, Map<String, Deque<Chunk>> chunksBySID) {
+ conf = c;
+ this.chunksBySID = chunksBySID;
+ ExtractorWriter.setRecipient(this);
}
public void init(ServletConfig servletConf) throws ServletException {
@@ -93,9 +105,9 @@ public class LogDisplayServlet extends HttpServlet {
MessageDigest md;
md = MessageDigest.getInstance("MD5");
- md.update(c.getSource().getBytes());
- md.update(c.getStreamName().getBytes());
- md.update(c.getTags().getBytes());
+ md.update(c.getSource().getBytes(Charset.forName("UTF-8")));
+ md.update(c.getStreamName().getBytes(Charset.forName("UTF-8")));
+ md.update(c.getTags().getBytes(Charset.forName("UTF-8")));
StringBuilder sb = new StringBuilder();
byte[] bytes = md.digest();
for(int i=0; i < bytes.length; ++i) {
@@ -106,7 +118,6 @@ public class LogDisplayServlet extends HttpServlet {
return sb.toString();
} catch(NoSuchAlgorithmException n) {
log.fatal(n);
- System.exit(0);
return null;
}
}
@@ -146,7 +157,7 @@ public class LogDisplayServlet extends HttpServlet {
protected synchronized void doGet(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {
- PrintStream out = new PrintStream(new BufferedOutputStream(resp.getOutputStream()));
+ PrintStream out = new PrintStream(new BufferedOutputStream(resp.getOutputStream()), true, "UTF-8");
resp.setStatus(200);
String path = req.getServletPath();
String streamID = req.getParameter("sid");
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java
index 5c3ea71..0a78f2f 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java
@@ -54,14 +54,14 @@ public class ServletCollector extends HttpServlet {
* If a chunk is committed; then the ack will start with the following string.
*/
public static final String ACK_PREFIX = "ok: ";
- org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter writer = null;
+ transient ChukwaWriter writer = null;
private static final long serialVersionUID = 6286162898591407111L;
- Logger log = Logger.getRootLogger();// .getLogger(ServletCollector.class);
+ transient Logger log = Logger.getLogger(ServletCollector.class);
- static boolean COMPRESS;
- static String CODEC_NAME;
- static CompressionCodec codec;
+ boolean COMPRESS;
+ String CODEC_NAME;
+ transient CompressionCodec codec;
public void setWriter(ChukwaWriter w) {
writer = w;
@@ -76,7 +76,7 @@ public class ServletCollector extends HttpServlet {
int numberchunks = 0;
long lifetimechunks = 0;
- Configuration conf;
+ transient Configuration conf;
public ServletCollector(Configuration c) {
conf = c;
@@ -151,7 +151,6 @@ public class ServletCollector extends HttpServlet {
protected void accept(HttpServletRequest req, HttpServletResponse resp)
throws ServletException {
numberHTTPConnection++;
- ServletDiagnostics diagnosticPage = new ServletDiagnostics();
final long currentTime = System.currentTimeMillis();
try {
@@ -173,10 +172,6 @@ public class ServletCollector extends HttpServlet {
final int numEvents = di.readInt();
// log.info("saw " + numEvents+ " in request");
- if (FANCY_DIAGNOSTICS) {
- diagnosticPage.sawPost(req.getRemoteHost(), numEvents, currentTime);
- }
-
List<Chunk> events = new LinkedList<Chunk>();
StringBuilder sb = new StringBuilder();
@@ -184,9 +179,6 @@ public class ServletCollector extends HttpServlet {
ChunkImpl logEvent = ChunkImpl.read(di);
events.add(logEvent);
- if (FANCY_DIAGNOSTICS) {
- diagnosticPage.sawChunk(logEvent, i);
- }
}
int responseStatus = HttpServletResponse.SC_OK;
@@ -226,10 +218,6 @@ public class ServletCollector extends HttpServlet {
l_out.println("can't write: no writer");
}
- if (FANCY_DIAGNOSTICS) {
- diagnosticPage.doneWithPost();
- }
-
resp.setStatus(responseStatus);
} catch (Throwable e) {
@@ -251,7 +239,7 @@ public class ServletCollector extends HttpServlet {
log.info("new GET from " + req.getRemoteHost() + " at " + System.currentTimeMillis());
- PrintStream out = new PrintStream(resp.getOutputStream());
+ PrintStream out = new PrintStream(resp.getOutputStream(), true, "UTF-8");
resp.setStatus(200);
String pingAtt = req.getParameter("ping");
@@ -264,8 +252,6 @@ public class ServletCollector extends HttpServlet {
out.println("lifetimechunks:" + lifetimechunks);
} else {
out.println("<html><body><h2>Chukwa servlet running</h2>");
- if (FANCY_DIAGNOSTICS)
- ServletDiagnostics.printPage(out);
out.println("</body></html>");
}
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/connector/ChunkCatcherConnector.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/connector/ChunkCatcherConnector.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/connector/ChunkCatcherConnector.java
index a9bd744..29c1fb5 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/connector/ChunkCatcherConnector.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/connector/ChunkCatcherConnector.java
@@ -28,7 +28,7 @@ public class ChunkCatcherConnector implements Connector {
Timer tm;
- class Interruptor extends TimerTask {
+ static class Interruptor extends TimerTask {
Thread targ;
volatile boolean deactivate = false;
Interruptor(Thread t) {
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/connector/PipelineConnector.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/connector/PipelineConnector.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/connector/PipelineConnector.java
index b998139..929d871 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/connector/PipelineConnector.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/connector/PipelineConnector.java
@@ -59,7 +59,7 @@ public class PipelineConnector implements Connector, Runnable {
ChunkQueue chunkQueue;
- private static volatile ChukwaAgent agent = null;
+ private ChukwaAgent agent = null;
private volatile boolean stopMe = false;
protected ChukwaWriter writers = null;
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/connector/http/HttpConnector.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/connector/http/HttpConnector.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/connector/http/HttpConnector.java
index 3bb0dd7..e542b2f 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/connector/http/HttpConnector.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/connector/http/HttpConnector.java
@@ -41,6 +41,8 @@ import java.util.Iterator;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicInteger;
+
import org.apache.hadoop.chukwa.Chunk;
import org.apache.hadoop.chukwa.datacollection.ChunkQueue;
import org.apache.hadoop.chukwa.datacollection.DataFactory;
@@ -55,7 +57,7 @@ public class HttpConnector implements Connector, Runnable {
static Logger log = Logger.getLogger(HttpConnector.class);
Timer statTimer = null;
- volatile int chunkCount = 0;
+ AtomicInteger chunkCount = new AtomicInteger();
int MAX_SIZE_PER_POST = 2 * 1024 * 1024;
int MIN_POST_INTERVAL = 5 * 1000;
@@ -78,8 +80,8 @@ public class HttpConnector implements Connector, Runnable {
statTimer = new Timer();
statTimer.schedule(new TimerTask() {
public void run() {
- int count = chunkCount;
- chunkCount = 0;
+ int count = chunkCount.get();
+ chunkCount.set(0);
log.info("# http chunks ACK'ed since last report: " + count);
}
}, 100, 60 * 1000);
@@ -170,7 +172,7 @@ public class HttpConnector implements Connector, Runnable {
// checkpoint the chunks which were committed
for (ChukwaHttpSender.CommitListEntry cle : results) {
agent.reportCommit(cle.adaptor, cle.uuid);
- chunkCount++;
+ chunkCount.set(chunkCount.get()+1);;
}
long now = System.currentTimeMillis();
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/AsyncAckSender.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/AsyncAckSender.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/AsyncAckSender.java
index 6f818e4..2b6617d 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/AsyncAckSender.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/AsyncAckSender.java
@@ -23,12 +23,10 @@ import org.apache.hadoop.chukwa.datacollection.agent.*;
import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
import org.apache.hadoop.chukwa.datacollection.collector.servlet.CommitCheckServlet;
import org.apache.hadoop.chukwa.datacollection.collector.servlet.ServletCollector;
-import org.apache.hadoop.chukwa.datacollection.sender.ChukwaHttpSender.CommitListEntry;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.*;
-import org.apache.commons.httpclient.*;
import org.apache.commons.httpclient.methods.GetMethod;
import org.apache.commons.httpclient.methods.PostMethod;
//import com.google.common.collect.SortedSetMultimap;
@@ -74,7 +72,7 @@ public class AsyncAckSender extends ChukwaHttpSender{
int c = o.aName.compareTo(this.aName);
if(c != 0)
return c;
- c = fname.compareTo(this.fname);
+ c = o.fname.compareTo(this.fname);
if(c != 0)
return c;
if(o.start < start)
@@ -83,7 +81,19 @@ public class AsyncAckSender extends ChukwaHttpSender{
return -1;
else return 0;
}
-
+
+ @Override
+ public boolean equals(Object o) {
+ if(!(o instanceof DelayedCommit)) {
+ return false;
+ }
+ DelayedCommit dc = (DelayedCommit) o;
+ if(this.aName.equals(dc.aName)) {
+ return true;
+ }
+ return false;
+ }
+
public String toString() {
return adaptor +" commits from" + start + " to " + uuid + " when " + fname + " hits " + fOffset;
}
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java
index 1c8c3d2..76727fe 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java
@@ -88,9 +88,9 @@ public class ChukwaHttpSender implements ChukwaSender {
protected Iterator<String> collectors;
- static boolean COMPRESS;
- static String CODEC_NAME;
- static CompressionCodec codec;
+ boolean COMPRESS;
+ String CODEC_NAME;
+ CompressionCodec codec;
static {
connectionManager = new MultiThreadedHttpConnectionManager();
@@ -112,9 +112,13 @@ public class ChukwaHttpSender implements ChukwaSender {
// FIXME: probably we're better off with an EventListRequestEntity
static class BuffersRequestEntity implements RequestEntity {
List<DataOutputBuffer> buffers;
+ boolean compress;
+ CompressionCodec codec;
- public BuffersRequestEntity(List<DataOutputBuffer> buf) {
+ public BuffersRequestEntity(List<DataOutputBuffer> buf, boolean compress, CompressionCodec codec) {
buffers = buf;
+ this.compress = compress;
+ this.codec = codec;
}
private long getUncompressedContentLenght(){
@@ -125,7 +129,7 @@ public class ChukwaHttpSender implements ChukwaSender {
}
public long getContentLength() {
- if( COMPRESS) {
+ if(compress) {
return -1;
}
else {
@@ -148,7 +152,7 @@ public class ChukwaHttpSender implements ChukwaSender {
}
public void writeRequest(OutputStream out) throws IOException {
- if( COMPRESS) {
+ if(compress) {
CompressionOutputStream cos = codec.createOutputStream(out);
DataOutputStream dos = new DataOutputStream( cos);
doWriteRequest( dos);
@@ -239,7 +243,7 @@ public class ChukwaHttpSender implements ChukwaSender {
toSend.clear();
// collect all serialized chunks into a single buffer to send
- RequestEntity postData = new BuffersRequestEntity(serializedEvents);
+ RequestEntity postData = new BuffersRequestEntity(serializedEvents, COMPRESS, codec);
PostMethod method = new PostMethod();
method.setRequestEntity(postData);
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/RetryListOfCollectors.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/RetryListOfCollectors.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/RetryListOfCollectors.java
index 15cb20a..c636ad2 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/RetryListOfCollectors.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/RetryListOfCollectors.java
@@ -20,7 +20,9 @@ package org.apache.hadoop.chukwa.datacollection.sender;
import java.io.*;
+import java.nio.charset.Charset;
import java.util.*;
+
import org.apache.hadoop.conf.Configuration;
/***
@@ -40,14 +42,14 @@ public class RetryListOfCollectors implements Iterator<String>, Cloneable {
long lastLookAtFirstNode;
int nextCollector = 0;
private String portNo;
- Configuration conf;
public static final String RETRY_RATE_OPT = "chukwaAgent.connector.retryRate";
public RetryListOfCollectors(File collectorFile, Configuration conf)
throws IOException {
this(conf);
try {
- BufferedReader br = new BufferedReader(new FileReader(collectorFile));
+ FileInputStream fis = new FileInputStream(collectorFile);
+ BufferedReader br = new BufferedReader(new InputStreamReader(fis, Charset.forName("UTF-8")));
String line, parsedline;
while ((line = br.readLine()) != null) {
parsedline = canonicalizeLine(line);
@@ -104,7 +106,6 @@ public class RetryListOfCollectors implements Iterator<String>, Cloneable {
public RetryListOfCollectors(Configuration conf) {
collectors = new ArrayList<String>();
- this.conf = conf;
portNo = conf.get("chukwaCollector.http.port", "8080");
maxRetryRateMs = conf.getInt(RETRY_RATE_OPT, 15 * 1000);
lastLookAtFirstNode = 0;
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/test/ConsoleOutConnector.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/test/ConsoleOutConnector.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/test/ConsoleOutConnector.java
index 7ba7a29..7c2e755 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/test/ConsoleOutConnector.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/test/ConsoleOutConnector.java
@@ -23,6 +23,8 @@ import org.apache.hadoop.chukwa.Chunk;
import org.apache.hadoop.chukwa.datacollection.*;
import org.apache.hadoop.chukwa.datacollection.agent.*;
import org.apache.hadoop.chukwa.datacollection.connector.Connector;
+
+import java.nio.charset.Charset;
import java.util.*;
/**
@@ -64,7 +66,7 @@ public class ConsoleOutConnector extends Thread implements Connector {
System.out.println("data length was " + e.getData().length
+ ", not printing");
else
- System.out.println(new String(e.getData()));
+ System.out.println(new String(e.getData(), Charset.forName("UTF-8")));
}
agent.reportCommit(e.getInitiator(), e.getSeqID());
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/test/FilePerPostWriter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/test/FilePerPostWriter.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/test/FilePerPostWriter.java
index b7215c9..d52d58f 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/test/FilePerPostWriter.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/test/FilePerPostWriter.java
@@ -28,8 +28,6 @@ import org.apache.hadoop.chukwa.ChukwaArchiveKey;
import org.apache.hadoop.chukwa.Chunk;
import org.apache.hadoop.chukwa.ChunkImpl;
import org.apache.hadoop.chukwa.datacollection.writer.*;
-import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter.CommitStatus;
-import org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter.StatReportingTask;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -45,17 +43,7 @@ public class FilePerPostWriter extends SeqFileWriter {
String baseName;
AtomicLong counter = new AtomicLong(0);
-
- protected FileSystem fs = null;
- protected Configuration conf = null;
-
- protected String outputDir = null;
-// private Calendar calendar = Calendar.getInstance();
- protected Path currentPath = null;
- protected String currentFileName = null;
-
-
@Override
public synchronized CommitStatus add(List<Chunk> chunks) throws WriterException {
@@ -83,12 +71,10 @@ public class FilePerPostWriter extends SeqFileWriter {
+ "/" + chunk.getStreamName());
archiveKey.setSeqId(chunk.getSeqID());
- if (chunk != null) {
// compute size for stats
dataSize += chunk.getData().length;
bytesThisRotate += chunk.getData().length;
seqFileWriter.append(archiveKey, chunk);
- }
}
@@ -129,7 +115,13 @@ public class FilePerPostWriter extends SeqFileWriter {
} catch(Exception e) {
throw new WriterException(e);
}
-
}
+ protected String getCurrentFileName() {
+ return currentFileName;
+ }
+
+ protected Path getCurrentPath() {
+ return currentPath;
+ }
}
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/test/FileTailerStressTest.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/test/FileTailerStressTest.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/test/FileTailerStressTest.java
index 7aeab22..e00229a 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/test/FileTailerStressTest.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/test/FileTailerStressTest.java
@@ -25,10 +25,13 @@ import org.apache.hadoop.chukwa.datacollection.collector.servlet.ServletCollecto
import org.apache.hadoop.chukwa.datacollection.connector.http.HttpConnector;
import org.apache.hadoop.chukwa.datacollection.controller.ChukwaAgentController;
import org.apache.hadoop.chukwa.datacollection.writer.ConsoleWriter;
+import org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter;
import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.Logger;
import org.mortbay.jetty.Server;
import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.ServletHolder;
+
import java.io.*;
import java.util.*;
@@ -36,6 +39,7 @@ public class FileTailerStressTest {
static final int DELAY_MIN = 10 * 1000;
static final int DELAY_RANGE = 2 * 1000;
+ static final Logger log = Logger.getLogger(FileTailerStressTest.class);
static class OccasionalWriterThread extends Thread {
File file;
@@ -45,9 +49,9 @@ public class FileTailerStressTest {
}
public void run() {
+ PrintWriter out = null;
try {
- FileOutputStream fos = new FileOutputStream(file);
- PrintWriter out = new PrintWriter(fos);
+ out = new PrintWriter(file.getAbsolutePath(), "UTF-8");
Random rand = new Random();
while (true) {
int delay = rand.nextInt(DELAY_RANGE) + DELAY_MIN;
@@ -59,6 +63,9 @@ public class FileTailerStressTest {
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
+ if(out != null) {
+ out.close();
+ }
}
}
}
@@ -91,7 +98,9 @@ public class FileTailerStressTest {
ChukwaAgentController cli = new ChukwaAgentController("localhost", portno);
File workdir = new File("/tmp/stresstest/");
- workdir.mkdir();
+ if(!workdir.mkdir()) {
+ log.warn("Error creating working directory:" + workdir.getAbsolutePath());
+ }
for (int i = 0; i < FILES_TO_USE; ++i) {
File newTestF = new File("/tmp/stresstest/" + i);
@@ -102,7 +111,9 @@ public class FileTailerStressTest {
Thread.sleep(60 * 1000);
System.out.println("cleaning up");
- workdir.delete();
+ if(!workdir.delete()) {
+ log.warn("Error clean up working directory:" + workdir.getAbsolutePath());
+ }
} catch (Exception e) {
e.printStackTrace();
}
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/test/SinkFileValidator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/test/SinkFileValidator.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/test/SinkFileValidator.java
index f48ba9a..db1be4d 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/test/SinkFileValidator.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/test/SinkFileValidator.java
@@ -19,7 +19,11 @@
package org.apache.hadoop.chukwa.datacollection.test;
+import java.io.IOException;
import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.Charset;
+
import org.apache.hadoop.chukwa.ChukwaArchiveKey;
import org.apache.hadoop.chukwa.ChunkImpl;
import org.apache.hadoop.conf.Configuration;
@@ -65,14 +69,16 @@ public class SinkFileValidator {
if (evt.getData().length > 1000) {
System.out.println("got event; data: "
- + new String(evt.getData(), 0, 1000));
+ + new String(evt.getData(), 0, 1000, Charset.forName("UTF-8")));
System.out.println("....[truncating]");
} else
- System.out.println("got event; data: " + new String(evt.getData()));
+ System.out.println("got event; data: " + new String(evt.getData(), Charset.forName("UTF-8")));
events++;
}
System.out.println("file looks OK!");
- } catch (Exception e) {
+ } catch (IOException e) {
+ e.printStackTrace();
+ } catch (URISyntaxException e) {
e.printStackTrace();
}
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/ConsoleWriter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/ConsoleWriter.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/ConsoleWriter.java
index 177d013..d8f5335 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/ConsoleWriter.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/ConsoleWriter.java
@@ -19,9 +19,11 @@
package org.apache.hadoop.chukwa.datacollection.writer;
+import java.nio.charset.Charset;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
+
import org.apache.hadoop.chukwa.Chunk;
import org.apache.hadoop.conf.Configuration;
@@ -84,7 +86,7 @@ public class ConsoleWriter implements ChukwaWriter {
System.out.print(data.getDataType());
System.out.print(") ");
System.out.print(new String(data.getData(), startOffset, offset
- - startOffset + 1));
+ - startOffset + 1, Charset.forName("UTF-8")));
startOffset = offset + 1;
}
}
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/Dedup.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/Dedup.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/Dedup.java
index cefb42b..2cfd216 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/Dedup.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/Dedup.java
@@ -57,7 +57,7 @@ public class Dedup extends PipelineableWriter {
final HashSet<EntryType> hs;
final Queue<EntryType> toDrop;
final int maxSize;
- volatile long dupchunks = 0;
+ long dupchunks = 0;
public FixedSizeCache(int size) {
maxSize = size;
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/ExtractorWriter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/ExtractorWriter.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/ExtractorWriter.java
index 1a681f5..a8a281e 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/ExtractorWriter.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/ExtractorWriter.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.conf.Configuration;
public class ExtractorWriter extends PipelineableWriter {
- public static LogDisplayServlet recipient;
+ private static LogDisplayServlet recipient;
@Override
public void close() throws WriterException {
@@ -44,4 +44,8 @@ public class ExtractorWriter extends PipelineableWriter {
return ChukwaWriter.COMMIT_OK;
}
+ public static void setRecipient(LogDisplayServlet logDisplayServlet) {
+ recipient = logDisplayServlet;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/InMemoryWriter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/InMemoryWriter.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/InMemoryWriter.java
index f2d4252..4d9e2a0 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/InMemoryWriter.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/InMemoryWriter.java
@@ -44,9 +44,6 @@ public class InMemoryWriter implements ChukwaWriter {
e.printStackTrace();
throw new WriterException(e);
}
- synchronized (this) {
- notify();
- }
}
@Override
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java
index e30362d..141be20 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java
@@ -111,7 +111,11 @@ public class PipelineStageWriter implements ChukwaWriter {
writer = (ChukwaWriter) st; // one stage pipeline
}
return;
- } catch (Exception e) {
+ } catch (IOException |
+ WriterException |
+ ClassNotFoundException |
+ IllegalAccessException |
+ InstantiationException e) {
// if anything went wrong (missing class, etc) we wind up here.
log.error("failed to set up pipeline, defaulting to SeqFileWriter", e);
// fall through to default case
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java
index 3c0d268..3803a2e 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java
@@ -47,7 +47,7 @@ import org.apache.log4j.Logger;
*/
public class SeqFileWriter extends PipelineableWriter implements ChukwaWriter {
static Logger log = Logger.getLogger(SeqFileWriter.class);
- public static boolean ENABLE_ROTATION_ON_CLOSE = true;
+ private static boolean ENABLE_ROTATION_ON_CLOSE = true;
protected int STAT_INTERVAL_SECONDS = 30;
private int rotateInterval = 1000 * 60 * 5;
@@ -60,7 +60,7 @@ public class SeqFileWriter extends PipelineableWriter implements ChukwaWriter {
public static final String IF_FIXED_INTERVAL_OPT = "chukwaCollector.isFixedTimeRotatorScheme";
public static final String FIXED_INTERVAL_OFFSET_OPT = "chukwaCollector.fixedTimeIntervalOffset";
public static final String OUTPUT_DIR_OPT= "chukwaCollector.outputDir";
- protected static String localHostAddr = null;
+ public String localHostAddr = null;
protected final Semaphore lock = new Semaphore(1, true);
@@ -85,7 +85,7 @@ public class SeqFileWriter extends PipelineableWriter implements ChukwaWriter {
protected volatile long bytesThisRotate = 0;
protected volatile boolean isRunning = false;
- static {
+ public SeqFileWriter() {
try {
localHostAddr = "_" + InetAddress.getLocalHost().getHostName() + "_";
} catch (UnknownHostException e) {
@@ -93,8 +93,6 @@ public class SeqFileWriter extends PipelineableWriter implements ChukwaWriter {
}
}
- public SeqFileWriter() {}
-
public long getBytesWritten() {
return dataSize;
}
@@ -135,7 +133,7 @@ public class SeqFileWriter extends PipelineableWriter implements ChukwaWriter {
try {
fs = FileSystem.get(new URI(fsname), conf);
if (fs == null) {
- log.error("can't connect to HDFS at " + fs.getUri() + " bail out!");
+ log.error("can't connect to HDFS.");
}
} catch (Throwable e) {
log.error(
@@ -324,49 +322,45 @@ public class SeqFileWriter extends PipelineableWriter implements ChukwaWriter {
throw new WriterException("Collector not ready");
}
- if (chunks != null) {
- ChukwaArchiveKey archiveKey = new ChukwaArchiveKey();
-
- if (System.currentTimeMillis() >= nextTimePeriodComputation) {
- computeTimePeriod();
- }
- try {
- lock.acquire();
- for (Chunk chunk : chunks) {
- archiveKey.setTimePartition(timePeriod);
- archiveKey.setDataType(chunk.getDataType());
- archiveKey.setStreamName(chunk.getTags() + "/" + chunk.getSource()
- + "/" + chunk.getStreamName());
- archiveKey.setSeqId(chunk.getSeqID());
-
- if (chunk != null) {
- seqFileWriter.append(archiveKey, chunk);
-
- // compute size for stats only if append succeeded. Note though that
- // seqFileWriter.append can continue taking data for quite some time
- // after HDFS goes down while the client is trying to reconnect. Hence
- // these stats might not reflect reality during an HDFS outage.
- dataSize += chunk.getData().length;
- bytesThisRotate += chunk.getData().length;
-
- String futureName = currentPath.getName().replace(".chukwa", ".done");
- result.addPend(futureName, currentOutputStr.getPos());
- }
+ ChukwaArchiveKey archiveKey = new ChukwaArchiveKey();
+
+ if (System.currentTimeMillis() >= nextTimePeriodComputation) {
+ computeTimePeriod();
+ }
+ try {
+ lock.acquire();
+ for (Chunk chunk : chunks) {
+ archiveKey.setTimePartition(timePeriod);
+ archiveKey.setDataType(chunk.getDataType());
+ archiveKey.setStreamName(chunk.getTags() + "/" + chunk.getSource()
+ + "/" + chunk.getStreamName());
+ archiveKey.setSeqId(chunk.getSeqID());
+
+ seqFileWriter.append(archiveKey, chunk);
+
+ // compute size for stats only if append succeeded. Note though that
+ // seqFileWriter.append can continue taking data for quite some time
+ // after HDFS goes down while the client is trying to reconnect. Hence
+ // these stats might not reflect reality during an HDFS outage.
+ dataSize += chunk.getData().length;
+ bytesThisRotate += chunk.getData().length;
+
+ String futureName = currentPath.getName().replace(".chukwa", ".done");
+ result.addPend(futureName, currentOutputStr.getPos());
- }
- }
- catch (IOException e) {
- log.error("IOException when trying to write a chunk, Collector will return error and keep running.", e);
- return COMMIT_FAIL;
- }
- catch (Throwable e) {
- // We don't want to loose anything
- log.fatal("IOException when trying to write a chunk, Collector is going to exit!", e);
- isRunning = false;
- } finally {
- lock.release();
}
}
+ catch (IOException e) {
+ log.error("IOException when trying to write a chunk, Collector will return error and keep running.", e);
+ return COMMIT_FAIL;
+ }
+ catch (Throwable e) {
+ // We don't want to loose anything
+ log.fatal("IOException when trying to write a chunk, Collector is going to exit!", e);
+ isRunning = false;
+ } finally {
+ lock.release();
+ }
return result;
}
@@ -405,5 +399,9 @@ public class SeqFileWriter extends PipelineableWriter implements ChukwaWriter {
lock.release();
}
}
+
+ public static void setEnableRotationOnClose(boolean b) {
+ ENABLE_ROTATION_ON_CLOSE = b;
+ }
}
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/SocketTeeWriter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/SocketTeeWriter.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/SocketTeeWriter.java
index 0249b4f..88ec861 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/SocketTeeWriter.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/SocketTeeWriter.java
@@ -20,14 +20,18 @@ package org.apache.hadoop.chukwa.datacollection.writer;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ArrayBlockingQueue;
+
import org.apache.hadoop.chukwa.Chunk;
import org.apache.hadoop.chukwa.util.Filter;
import org.apache.hadoop.chukwa.util.RegexUtil.CheckedPatternSyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
+
import java.net.ServerSocket;
import java.net.Socket;
+import java.nio.charset.Charset;
import java.io.*;
+
import org.apache.hadoop.chukwa.util.ExceptionUtil;
/**
@@ -145,7 +149,7 @@ public class SocketTeeWriter extends PipelineableWriter {
else {
byte[] data = c.getData();
byte[] header = (c.getSource()+ " " + c.getDataType() + " " + c.getStreamName()+ " "+
- c.getSeqID()+"\n").getBytes();
+ c.getSeqID()+"\n").getBytes(Charset.forName("UTF-8"));
out.writeInt(data.length+ header.length);
out.write(header);
out.write(data);
@@ -170,9 +174,12 @@ public class SocketTeeWriter extends PipelineableWriter {
try { //inner try catches bad command syntax errors
sock.setSoTimeout(timeout);
sock.setKeepAlive(USE_KEEPALIVE);
- in = new BufferedReader(new InputStreamReader(sock.getInputStream()));
+ in = new BufferedReader(new InputStreamReader(sock.getInputStream(), Charset.forName("UTF-8")));
out = new DataOutputStream(sock.getOutputStream());
String cmd = in.readLine();
+ if(cmd==null) {
+ throw new IllegalArgumentException("No input found.");
+ }
if(!cmd.contains(" ")) {
throw new IllegalArgumentException(
@@ -198,8 +205,8 @@ public class SocketTeeWriter extends PipelineableWriter {
try {
rules = new Filter(cmdAfterSpace);
} catch (CheckedPatternSyntaxException pse) {
- out.write("Error parsing command as a regex: ".getBytes());
- out.write(pse.getMessage().getBytes());
+ out.write("Error parsing command as a regex: ".getBytes(Charset.forName("UTF-8")));
+ out.write(pse.getMessage().getBytes(Charset.forName("UTF-8")));
out.writeByte('\n');
out.close();
in.close();
@@ -212,10 +219,10 @@ public class SocketTeeWriter extends PipelineableWriter {
synchronized(tees) {
tees.add(this);
}
- out.write("OK\n".getBytes());
+ out.write("OK\n".getBytes(Charset.forName("UTF-8")));
log.info("tee to " + sock.getInetAddress() + " established");
} catch(IllegalArgumentException e) {
- out.write(e.toString().getBytes());
+ out.write(e.toString().getBytes(Charset.forName("UTF-8")));
out.writeByte('\n');
out.close();
in.close();
@@ -239,8 +246,11 @@ public class SocketTeeWriter extends PipelineableWriter {
public void handle(Chunk c) {
//don't ever block; just ignore this chunk if we don't have room for it.
- if(rules.matches(c))
- sendQ.offer(c);
+ if(rules.matches(c)) {
+ if(!sendQ.offer(c)) {
+ log.debug("Queue is full.");
+ }
+ }
}
}
@@ -249,7 +259,6 @@ public class SocketTeeWriter extends PipelineableWriter {
SocketListenThread listenThread;
List<Tee> tees;
- ChukwaWriter next;
@Override
public void setNextStage(ChukwaWriter next) {
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/localfs/LocalToRemoteHdfsMover.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/localfs/LocalToRemoteHdfsMover.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/localfs/LocalToRemoteHdfsMover.java
index 02e7907..e0ffdc4 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/localfs/LocalToRemoteHdfsMover.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/localfs/LocalToRemoteHdfsMover.java
@@ -22,6 +22,7 @@ import java.io.FileNotFoundException;
import java.net.URI;
import java.util.concurrent.BlockingQueue;
+import org.apache.hadoop.chukwa.datacollection.writer.WriterException;
import org.apache.hadoop.chukwa.util.CopySequenceFile;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -85,8 +86,8 @@ public class LocalToRemoteHdfsMover extends Thread {
remoteFs = FileSystem.get(new URI(fsname), conf);
if (remoteFs == null && exitIfHDFSNotavailable) {
- log.error("can't connect to HDFS at " + remoteFs.getUri() + " bail out!");
- System.exit(-1);
+ log.error("can't connect to HDFS.");
+ throw new WriterException("can't connect to HDFS.");
}
localFs = FileSystem.getLocal(conf);
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/localfs/LocalWriter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/localfs/LocalWriter.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/localfs/LocalWriter.java
index bb0fdf6..14d9ab8 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/localfs/LocalWriter.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/localfs/LocalWriter.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.chukwa.Chunk;
import org.apache.hadoop.chukwa.ChunkImpl;
import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter;
import org.apache.hadoop.chukwa.datacollection.writer.WriterException;
+import org.apache.hadoop.chukwa.util.ExceptionUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
@@ -148,7 +149,7 @@ public class LocalWriter implements ChukwaWriter {
}
} catch (Throwable e) {
log.fatal("Cannot initialize LocalWriter", e);
- System.exit(-1);
+ throw new WriterException(e);
}
@@ -184,7 +185,11 @@ public class LocalWriter implements ChukwaWriter {
private class RotateTask extends TimerTask {
public void run() {
- rotate();
+ try {
+ rotate();
+ } catch(WriterException e) {
+ log.error(ExceptionUtil.getStackTrace(e));
+ }
};
}
@@ -245,11 +250,9 @@ public class LocalWriter implements ChukwaWriter {
+ "/" + chunk.getStreamName());
archiveKey.setSeqId(chunk.getSeqID());
- if (chunk != null) {
- seqFileWriter.append(archiveKey, chunk);
- // compute size for stats
- dataSize += chunk.getData().length;
- }
+ seqFileWriter.append(archiveKey, chunk);
+ // compute size for stats
+ dataSize += chunk.getData().length;
}
}// End synchro
long end = System.currentTimeMillis();
@@ -264,7 +267,6 @@ public class LocalWriter implements ChukwaWriter {
if (writeChunkRetries < 0) {
log
.fatal("Too many IOException when trying to write a chunk, Collector is going to exit!");
- System.exit(-1);
}
throw new WriterException(e);
}
@@ -272,7 +274,7 @@ public class LocalWriter implements ChukwaWriter {
return COMMIT_OK;
}
- protected void rotate() {
+ protected void rotate() throws WriterException {
isRunning = true;
calendar.setTimeInMillis(System.currentTimeMillis());
log.info("start Date [" + calendar.getTime() + "]");
@@ -316,10 +318,7 @@ public class LocalWriter implements ChukwaWriter {
SequenceFile.CompressionType.NONE, null);
} catch (IOException e) {
- log.fatal("IO Exception in rotate. Exiting!", e);
- // Shutting down the collector
- // Watchdog will re-start it automatically
- System.exit(-1);
+ log.fatal("IO Exception in rotate: ", e);
}
}
@@ -336,8 +335,8 @@ public class LocalWriter implements ChukwaWriter {
}
if (freeSpace < minFreeAvailable) {
- log.fatal("No space left on device, Bail out!");
- System.exit(-1);
+ log.fatal("No space left on device.");
+ throw new WriterException("No space left on device.");
}
log.debug("finished rotate()");
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/solr/SolrWriter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/solr/SolrWriter.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/solr/SolrWriter.java
index bf64b24..40a6ff0 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/solr/SolrWriter.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/solr/SolrWriter.java
@@ -19,12 +19,10 @@ package org.apache.hadoop.chukwa.datacollection.writer.solr;
import java.io.IOException;
import java.nio.charset.Charset;
-import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
-import java.util.TimeZone;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -106,6 +104,9 @@ public class SolrWriter extends PipelineableWriter {
if(data.contains("mapredice")) {
doc.addField(SERVICE, "mapreduce");
}
+ if(data.contains("hbase")) {
+ doc.addField(SERVICE, "hbase");
+ }
try {
Date d = sdf.parse(data);
doc.addField(DATE, d, 1.0f);
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/dataloader/DataLoaderFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/dataloader/DataLoaderFactory.java b/src/main/java/org/apache/hadoop/chukwa/dataloader/DataLoaderFactory.java
index c09d1ee..3b8b946 100644
--- a/src/main/java/org/apache/hadoop/chukwa/dataloader/DataLoaderFactory.java
+++ b/src/main/java/org/apache/hadoop/chukwa/dataloader/DataLoaderFactory.java
@@ -19,14 +19,16 @@
package org.apache.hadoop.chukwa.dataloader;
import java.io.IOException;
+import java.util.Arrays;
+
import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
public abstract class DataLoaderFactory {
- static ChukwaConfiguration conf = null;
- static FileSystem fs = null;
+ ChukwaConfiguration conf = null;
+ FileSystem fs = null;
protected FileStatus[] source = null;
public DataLoaderFactory() {
@@ -37,9 +39,20 @@ public abstract class DataLoaderFactory {
* @throws IOException
*/
public void load(ChukwaConfiguration conf, FileSystem fs, FileStatus[] src) throws IOException {
- this.source=src;
+ this.source=src.clone();
this.conf=conf;
this.fs=fs;
}
+ public FileStatus[] getSource() {
+ return Arrays.copyOf(source, source.length);
+ }
+
+ protected FileSystem getFileSystem() {
+ return fs;
+ }
+
+ protected ChukwaConfiguration getConf() {
+ return conf;
+ }
}
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/dataloader/FSMDataLoader.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/dataloader/FSMDataLoader.java b/src/main/java/org/apache/hadoop/chukwa/dataloader/FSMDataLoader.java
index 336a09b..009dd2b 100644
--- a/src/main/java/org/apache/hadoop/chukwa/dataloader/FSMDataLoader.java
+++ b/src/main/java/org/apache/hadoop/chukwa/dataloader/FSMDataLoader.java
@@ -43,8 +43,8 @@ public class FSMDataLoader extends DataLoaderFactory {
protected MetricDataLoader threads[] = null;
private static String DATA_LOADER_THREAD_LIMIT = "chukwa.data.loader.threads.limit";
private int size = 1;
- private static CompletionService completion = null;
- private static ExecutorService executor = null;
+ private CompletionService completion = null;
+ private ExecutorService executor = null;
private static String[] mappers = {
"org.apache.hadoop.chukwa.analysis.salsa.fsm.DataNodeClientTraceMapper",
"org.apache.hadoop.chukwa.analysis.salsa.fsm.TaskTrackerClientTraceMapper",