You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@chukwa.apache.org by ey...@apache.org on 2015/07/26 04:08:59 UTC
[6/8] chukwa git commit: CHUKWA-771. Improved code quality issue
identified by findbugs. (Eric Yang)
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/dataloader/MetricDataLoader.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/dataloader/MetricDataLoader.java b/src/main/java/org/apache/hadoop/chukwa/dataloader/MetricDataLoader.java
index ae9233c..5538a40 100644
--- a/src/main/java/org/apache/hadoop/chukwa/dataloader/MetricDataLoader.java
+++ b/src/main/java/org/apache/hadoop/chukwa/dataloader/MetricDataLoader.java
@@ -27,6 +27,7 @@ import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.Map.Entry;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.concurrent.Callable;
@@ -62,10 +63,15 @@ public class MetricDataLoader implements Callable {
private Connection conn = null;
private Path source = null;
- private static ChukwaConfiguration conf = null;
- private static FileSystem fs = null;
+ private ChukwaConfiguration conf = null;
+ private FileSystem fs = null;
private String jdbc_url = "";
+ public MetricDataLoader(String fileName) throws IOException {
+ conf = new ChukwaConfiguration();
+ fs = FileSystem.get(conf);
+ }
+
/** Creates a new instance of DBWriter */
public MetricDataLoader(ChukwaConfiguration conf, FileSystem fs, String fileName) {
source = new Path(fileName);
@@ -171,6 +177,9 @@ public class MetricDataLoader implements Callable {
return( sb.toString());
}
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value =
+ "SQL_NONCONSTANT_STRING_PASSED_TO_EXECUTE",
+ justification = "Dynamic based upon tables in the database")
public boolean run() throws IOException {
boolean first=true;
log.info("StreamName: " + source.getName());
@@ -195,7 +204,7 @@ public class MetricDataLoader implements Callable {
try {
Pattern p = Pattern.compile("(.*)\\-(\\d+)$");
int batch = 0;
- while (reader.next(key, record)) {
+ while (reader !=null && reader.next(key, record)) {
numOfRecords++;
if(first) {
try {
@@ -336,12 +345,9 @@ public class MetricDataLoader implements Callable {
}
}
- Iterator<String> i = hashReport.keySet().iterator();
- while (i.hasNext()) {
- Object iteratorNode = i.next();
- HashMap<String, String> recordSet = hashReport.get(iteratorNode);
- Iterator<String> fi = recordSet.keySet().iterator();
- // Map any primary key that was not included in the report keyName
+ for(Entry<String, HashMap<String, String>> entry : hashReport.entrySet()) {
+ HashMap<String, String> recordSet = entry.getValue();
+ // Map any primary key that was not included in the report keyName
StringBuilder sqlPriKeys = new StringBuilder();
try {
for (String priKey : priKeys) {
@@ -363,8 +369,9 @@ public class MetricDataLoader implements Callable {
// Map the hash objects to database table columns
StringBuilder sqlValues = new StringBuilder();
boolean firstValue = true;
- while (fi.hasNext()) {
- String fieldKey = fi.next();
+ for(Entry<String, String> fi : recordSet.entrySet()) {
+ String fieldKey = fi.getKey();
+ String fieldValue = fi.getValue();
if (transformer.containsKey(fieldKey) && transformer.get(fieldKey).intern()!="_delete".intern()) {
if (!firstValue) {
sqlValues.append(", ");
@@ -378,12 +385,12 @@ public class MetricDataLoader implements Callable {
if (conversion.containsKey(conversionKey)) {
sqlValues.append(transformer.get(fieldKey));
sqlValues.append("=");
- sqlValues.append(recordSet.get(fieldKey));
+ sqlValues.append(fieldValue);
sqlValues.append(conversion.get(conversionKey).toString());
} else {
sqlValues.append(transformer.get(fieldKey));
sqlValues.append("=\'");
- sqlValues.append(escapeQuotes(recordSet.get(fieldKey)));
+ sqlValues.append(escapeQuotes(fieldValue));
sqlValues.append("\'");
}
} else if (dbSchema.get(dbTables.get(dbKey)).get(
@@ -391,8 +398,7 @@ public class MetricDataLoader implements Callable {
SimpleDateFormat formatter = new SimpleDateFormat(
"yyyy-MM-dd HH:mm:ss");
Date recordDate = new Date();
- recordDate.setTime(Long.parseLong(recordSet
- .get(fieldKey)));
+ recordDate.setTime(Long.parseLong(fieldValue));
sqlValues.append(transformer.get(fieldKey));
sqlValues.append("=\"");
sqlValues.append(formatter.format(recordDate));
@@ -405,7 +411,7 @@ public class MetricDataLoader implements Callable {
transformer.get(fieldKey)) == java.sql.Types.INTEGER) {
long tmp = 0;
try {
- tmp = Long.parseLong(recordSet.get(fieldKey).toString());
+ tmp = Long.parseLong(fieldValue);
String conversionKey = "conversion." + fieldKey;
if (conversion.containsKey(conversionKey)) {
tmp = tmp
@@ -420,7 +426,7 @@ public class MetricDataLoader implements Callable {
sqlValues.append(tmp);
} else {
double tmp = 0;
- tmp = Double.parseDouble(recordSet.get(fieldKey).toString());
+ tmp = Double.parseDouble(fieldValue);
String conversionKey = "conversion." + fieldKey;
if (conversion.containsKey(conversionKey)) {
tmp = tmp
@@ -455,7 +461,6 @@ public class MetricDataLoader implements Callable {
}
}
}
-
StringBuilder sql = new StringBuilder();
if (sqlPriKeys.length() > 0) {
sql.append("INSERT INTO ");
@@ -587,9 +592,7 @@ public class MetricDataLoader implements Callable {
public static void main(String[] args) {
try {
- conf = new ChukwaConfiguration();
- fs = FileSystem.get(conf);
- MetricDataLoader mdl = new MetricDataLoader(conf, fs, args[0]);
+ MetricDataLoader mdl = new MetricDataLoader(args[0]);
mdl.run();
} catch (Exception e) {
e.printStackTrace();
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/dataloader/MetricDataLoaderPool.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/dataloader/MetricDataLoaderPool.java b/src/main/java/org/apache/hadoop/chukwa/dataloader/MetricDataLoaderPool.java
index b763087..5ad5258 100644
--- a/src/main/java/org/apache/hadoop/chukwa/dataloader/MetricDataLoaderPool.java
+++ b/src/main/java/org/apache/hadoop/chukwa/dataloader/MetricDataLoaderPool.java
@@ -37,8 +37,8 @@ public class MetricDataLoaderPool extends DataLoaderFactory {
protected MetricDataLoader threads[] = null;
private static String DATA_LOADER_THREAD_LIMIT = "chukwa.data.loader.threads.limit";
private int size = 1;
- private static CompletionService completion = null;
- private static ExecutorService executor = null;
+ private CompletionService completion = null;
+ private ExecutorService executor = null;
public MetricDataLoaderPool() {
}
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/dataloader/SocketDataLoader.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/dataloader/SocketDataLoader.java b/src/main/java/org/apache/hadoop/chukwa/dataloader/SocketDataLoader.java
index 1cf801c..c47bdff 100644
--- a/src/main/java/org/apache/hadoop/chukwa/dataloader/SocketDataLoader.java
+++ b/src/main/java/org/apache/hadoop/chukwa/dataloader/SocketDataLoader.java
@@ -22,6 +22,7 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.net.Socket;
import java.net.SocketException;
+import java.nio.charset.Charset;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
@@ -100,7 +101,7 @@ public class SocketDataLoader implements Runnable {
output.append(" all");
}
output.append("\n");
- dos.write((output.toString()).getBytes());
+ dos.write((output.toString()).getBytes(Charset.forName("UTF-8")));
} catch (SocketException e) {
log.warn("Error while settin soTimeout to 120000");
}
@@ -135,7 +136,7 @@ public class SocketDataLoader implements Runnable {
/*
* Unsubscribe from Chukwa collector and stop streaming.
*/
- public void stop() {
+ public synchronized void stop() {
if(s!=null) {
try {
dis.close();
@@ -169,7 +170,7 @@ public class SocketDataLoader implements Runnable {
* into SDL queue.
*/
@Override
- public void run() {
+ public synchronized void run() {
try {
Chunk c;
while ((c = ChunkImpl.read(dis)) != null) {
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datastore/UserStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datastore/UserStore.java b/src/main/java/org/apache/hadoop/chukwa/datastore/UserStore.java
index 196a38a..8802fcf 100755
--- a/src/main/java/org/apache/hadoop/chukwa/datastore/UserStore.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datastore/UserStore.java
@@ -20,13 +20,13 @@ package org.apache.hadoop.chukwa.datastore;
import java.io.File;
import java.io.IOException;
+import java.nio.charset.Charset;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
-
import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
import org.apache.hadoop.chukwa.hicc.HiccWebServer;
import org.apache.hadoop.chukwa.rest.bean.UserBean;
@@ -44,10 +44,14 @@ public class UserStore {
private static Log log = LogFactory.getLog(UserStore.class);
private static Configuration config = new Configuration();
private static ChukwaConfiguration chukwaConf = new ChukwaConfiguration();
- private static String hiccPath = config.get("fs.defaultFS")+File.separator+chukwaConf.get("chukwa.data.dir")+File.separator+"hicc"+File.separator+"users";
+ private static String hiccPath = null;
+
+ static {
+ config = HiccWebServer.getConfig();
+ hiccPath = config.get("fs.defaultFS")+File.separator+chukwaConf.get("chukwa.data.dir")+File.separator+"hicc"+File.separator+"users";
+}
public UserStore() throws IllegalAccessException {
- UserStore.config = HiccWebServer.getConfig();
}
public UserStore(String uid) throws IllegalAccessException {
@@ -73,7 +77,7 @@ public class UserStore {
viewStream.readFully(buffer);
viewStream.close();
try {
- JSONObject json = (JSONObject) JSONValue.parse(new String(buffer));
+ JSONObject json = (JSONObject) JSONValue.parse(new String(buffer, Charset.forName("UTF-8")));
profile = new UserBean(json);
} catch (Exception e) {
log.error(ExceptionUtil.getStackTrace(e));
@@ -110,7 +114,7 @@ public class UserStore {
try {
fs = FileSystem.get(config);
FSDataOutputStream out = fs.create(profileFile,true);
- out.write(profile.deserialize().toString().getBytes());
+ out.write(profile.deserialize().toString().getBytes(Charset.forName("UTF-8")));
out.close();
} catch (IOException ex) {
log.error(ExceptionUtil.getStackTrace(ex));
@@ -138,7 +142,7 @@ public class UserStore {
profileStream.readFully(buffer);
profileStream.close();
try {
- UserBean user = new UserBean((JSONObject) JSONValue.parse(new String(buffer)));
+ UserBean user = new UserBean((JSONObject) JSONValue.parse(new String(buffer, Charset.forName("UTF-8"))));
list.add(user.getId());
} catch (Exception e) {
log.error(ExceptionUtil.getStackTrace(e));
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datastore/ViewStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datastore/ViewStore.java b/src/main/java/org/apache/hadoop/chukwa/datastore/ViewStore.java
index 9db639f..258300c 100755
--- a/src/main/java/org/apache/hadoop/chukwa/datastore/ViewStore.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datastore/ViewStore.java
@@ -20,13 +20,12 @@ package org.apache.hadoop.chukwa.datastore;
import java.io.File;
import java.io.IOException;
+import java.nio.charset.Charset;
import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
-
import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
import org.apache.hadoop.chukwa.hicc.HiccWebServer;
import org.apache.hadoop.chukwa.rest.bean.ViewBean;
@@ -44,15 +43,18 @@ public class ViewStore {
private String uid = null;
private ViewBean view = null;
private static Log log = LogFactory.getLog(ViewStore.class);
- private static Configuration config = new Configuration();
+ private static Configuration config = null;
private static ChukwaConfiguration chukwaConf = new ChukwaConfiguration();
- private static String viewPath = config.get("fs.defaultFS")+File.separator+chukwaConf.get("chukwa.data.dir")+File.separator+"hicc"+File.separator+"views";
+ private static String viewPath = null;
private static String publicViewPath = viewPath+File.separator+"public";
private static String usersViewPath = viewPath+File.separator+"users";
private static String PUBLIC = "public".intern();
+ static {
+ config = HiccWebServer.getConfig();
+ viewPath = config.get("fs.defaultFS")+File.separator+chukwaConf.get("chukwa.data.dir")+File.separator+"hicc"+File.separator+"views";
+ }
public ViewStore() throws IllegalAccessException {
- ViewStore.config = HiccWebServer.getConfig();
}
public ViewStore(String uid, String vid) throws IllegalAccessException {
@@ -141,7 +143,7 @@ public class ViewStore {
try {
FileSystem fs = FileSystem.get(config);
FSDataOutputStream out = fs.create(viewFile,true);
- out.write(view.deserialize().toString().getBytes());
+ out.write(view.deserialize().toString().getBytes(Charset.forName("UTF-8")));
out.close();
} catch (IOException ex) {
log.error(ExceptionUtil.getStackTrace(ex));
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datastore/WidgetStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datastore/WidgetStore.java b/src/main/java/org/apache/hadoop/chukwa/datastore/WidgetStore.java
index 10343d5..9512824 100755
--- a/src/main/java/org/apache/hadoop/chukwa/datastore/WidgetStore.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datastore/WidgetStore.java
@@ -20,13 +20,13 @@ package org.apache.hadoop.chukwa.datastore;
import java.io.File;
import java.io.IOException;
+import java.nio.charset.Charset;
import java.util.HashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
-
import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
import org.apache.hadoop.chukwa.hicc.HiccWebServer;
import org.apache.hadoop.chukwa.rest.bean.CatalogBean;
@@ -43,12 +43,15 @@ public class WidgetStore {
private static Log log = LogFactory.getLog(WidgetStore.class);
private static Configuration config = new Configuration();
private static ChukwaConfiguration chukwaConf = new ChukwaConfiguration();
- private static String hiccPath = config.get("fs.defaultFS")+File.separator+chukwaConf.get("chukwa.data.dir")+File.separator+"hicc"+File.separator+"widgets";
+ private static String hiccPath = null;
private static CatalogBean catalog = null;
private static HashMap<String, WidgetBean> list = new HashMap<String, WidgetBean>();
+ static {
+ config = HiccWebServer.getConfig();
+ hiccPath = config.get("fs.defaultFS")+File.separator+chukwaConf.get("chukwa.data.dir")+File.separator+"hicc"+File.separator+"widgets";
+ }
public WidgetStore() throws IllegalAccessException {
- WidgetStore.config = HiccWebServer.getConfig();
}
public void set(WidgetBean widget) throws IllegalAccessException {
@@ -94,7 +97,7 @@ public class WidgetStore {
widgetStream.readFully(buffer);
widgetStream.close();
try {
- JSONObject widgetBuffer = (JSONObject) JSONValue.parse(new String(buffer));
+ JSONObject widgetBuffer = (JSONObject) JSONValue.parse(new String(buffer, Charset.forName("UTF-8")));
WidgetBean widget = new WidgetBean(widgetBuffer);
catalog.addCatalog(widget);
list.put(widget.getId(),widget);
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datatrigger/HttpTriggerAction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datatrigger/HttpTriggerAction.java b/src/main/java/org/apache/hadoop/chukwa/datatrigger/HttpTriggerAction.java
index 7bf451e..db71668 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datatrigger/HttpTriggerAction.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datatrigger/HttpTriggerAction.java
@@ -30,8 +30,11 @@ import java.io.OutputStreamWriter;
import java.net.URL;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
+import java.nio.charset.Charset;
+import java.util.Arrays;
import java.util.Map;
import java.util.HashMap;
+import java.util.Map.Entry;
/**
* Trigger action that makes an HTTP request when executed.
@@ -129,14 +132,16 @@ public class HttpTriggerAction implements TriggerAction {
// set headers
boolean contentLengthExists = false;
if (headers != null) {
- for (String name: headers.keySet()) {
+ for(Entry<String, String> entry : headers.entrySet()) {
+ String name = entry.getKey();
+ String value = entry.getValue();
if (log.isDebugEnabled()) {
- log.debug("Setting header " + name + ": " + headers.get(name));
+ log.debug("Setting header " + name + ": " + value);
}
if (name.equalsIgnoreCase("content-length")) {
contentLengthExists = true;
}
- conn.setRequestProperty(name, headers.get(name));
+ conn.setRequestProperty(name, value);
}
}
@@ -149,7 +154,7 @@ public class HttpTriggerAction implements TriggerAction {
// send body if it exists
if (body != null) {
conn.setDoOutput(true);
- OutputStreamWriter writer = new OutputStreamWriter(conn.getOutputStream());
+ OutputStreamWriter writer = new OutputStreamWriter(conn.getOutputStream(), Charset.forName("UTF-8"));
writer.write(body);
writer.flush();
writer.close();
@@ -169,7 +174,7 @@ public class HttpTriggerAction implements TriggerAction {
}
else {
BufferedReader reader = new BufferedReader(
- new InputStreamReader(conn.getInputStream()));
+ new InputStreamReader(conn.getInputStream(), Charset.forName("UTF-8")));
String line;
StringBuilder sb = new StringBuilder();
while ((line = reader.readLine()) != null) {
@@ -215,7 +220,7 @@ public class HttpTriggerAction implements TriggerAction {
for (String header : headersSplit) {
String[] nvp = header.split(":", 2);
if (nvp.length < 2) {
- log.error("Invalid HTTP header found: " + nvp);
+ log.error("Invalid HTTP header found: " + Arrays.toString(nvp));
continue;
}
headerMap.put(nvp[0].trim(), nvp[1].trim());
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveManager.java b/src/main/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveManager.java
index 1ef6c00..6cb0dc1 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveManager.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveManager.java
@@ -33,7 +33,7 @@ import org.apache.log4j.Logger;
public class ChukwaArchiveManager implements CHUKWA_CONSTANT {
static Logger log = Logger.getLogger(ChukwaArchiveManager.class);
- static SimpleDateFormat day = new java.text.SimpleDateFormat("yyyyMMdd");
+ SimpleDateFormat day = new java.text.SimpleDateFormat("yyyyMMdd");
static final int ONE_HOUR = 60 * 60 * 1000;
static final int ONE_DAY = 24*ONE_HOUR;
@@ -113,7 +113,7 @@ public class ChukwaArchiveManager implements CHUKWA_CONSTANT {
if (maxPermittedErrorCount != -1 && errorCount >= maxPermittedErrorCount) {
log.warn("==================\nToo many errors (" + errorCount +
"), Bail out!\n==================");
- System.exit(-1);
+ break;
}
// /chukwa/archives/<YYYYMMDD>/dataSinkDirXXX
// to
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/DailyChukwaRecordRolling.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/DailyChukwaRecordRolling.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/DailyChukwaRecordRolling.java
index d1e2b24..bebd1e5 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/DailyChukwaRecordRolling.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/DailyChukwaRecordRolling.java
@@ -165,7 +165,7 @@ public class DailyChukwaRecordRolling extends Configured implements Tool {
new DailyChukwaRecordRolling(), mergeArgs, deleteRawdata);
List<RecordMerger> allMerge = new ArrayList<RecordMerger>();
if (rollInSequence) {
- merge.run();
+ merge.mergeRecords();
} else {
allMerge.add(merge);
merge.start();
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java
index be63e16..71ac1f7 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java
@@ -61,16 +61,28 @@ import org.apache.log4j.Logger;
public class Demux extends Configured implements Tool {
static Logger log = Logger.getLogger(Demux.class);
- static SimpleDateFormat day = new java.text.SimpleDateFormat("yyyyMMdd_HH_mm");
public static Configuration jobConf = null;
+ protected static void setJobConf(JobConf jobConf) {
+ Demux.jobConf = jobConf;
+ }
+
+ protected Configuration getJobConf() {
+ return Demux.jobConf;
+ }
public static class MapClass extends MapReduceBase implements
Mapper<ChukwaArchiveKey, ChunkImpl, ChukwaRecordKey, ChukwaRecord> {
+ private Configuration jobConf = null;
+
@Override
public void configure(JobConf jobConf) {
super.configure(jobConf);
- Demux.jobConf = jobConf;
+ setJobConf(jobConf);
+ }
+
+ private void setJobConf(JobConf jobConf) {
+ this.jobConf = jobConf;
}
public void map(ChukwaArchiveKey key, ChunkImpl chunk,
@@ -82,15 +94,15 @@ public class Demux extends Configured implements Tool {
try {
long duration = System.currentTimeMillis();
if (log.isDebugEnabled()) {
- log.debug("Entry: [" + chunk.getData() + "] EventType: ["
+ log.debug("Entry: [" + String.valueOf(chunk.getData()) + "] EventType: ["
+ chunk.getDataType() + "]");
}
- String defaultProcessor = Demux.jobConf.get(
+ String defaultProcessor = jobConf.get(
"chukwa.demux.mapper.default.processor",
"org.apache.hadoop.chukwa.extraction.demux.processor.mapper.DefaultProcessor");
- String processorClass_pri = Demux.jobConf.get(chunk.getDataType(),
+ String processorClass_pri = jobConf.get(chunk.getDataType(),
defaultProcessor);
String processorClass = processorClass_pri.split(",")[0];
@@ -125,9 +137,11 @@ public class Demux extends Configured implements Tool {
public static class ReduceClass extends MapReduceBase implements
Reducer<ChukwaRecordKey, ChukwaRecord, ChukwaRecordKey, ChukwaRecord> {
+ private Configuration jobConf = null;
+
public void configure(JobConf jobConf) {
super.configure(jobConf);
- Demux.jobConf = jobConf;
+ this.jobConf = jobConf;
}
public void reduce(ChukwaRecordKey key, Iterator<ChukwaRecord> values,
@@ -143,10 +157,10 @@ public class Demux extends Configured implements Tool {
String defaultProcessor_classname = "org.apache.hadoop.chukwa.extraction.demux.processor.reducer" +
".IdentityReducer";
- String defaultProcessor = Demux.jobConf.get("chukwa.demux.reducer.default.processor",
+ String defaultProcessor = jobConf.get("chukwa.demux.reducer.default.processor",
"," + defaultProcessor_classname);
- String processClass_pri = Demux.jobConf.get(key.getReduceType(), defaultProcessor);
+ String processClass_pri = jobConf.get(key.getReduceType(), defaultProcessor);
String[] processClass_tmps = processClass_pri.split(",");
String processClass = null;
if (processClass_tmps.length != 2)
@@ -199,7 +213,7 @@ public class Demux extends Configured implements Tool {
public int run(String[] args) throws Exception {
JobConf conf = new JobConf(new ChukwaConfiguration(), Demux.class);
-
+ SimpleDateFormat day = new java.text.SimpleDateFormat("yyyyMMdd_HH_mm");
conf.setJobName("Chukwa-Demux_" + day.format(new Date()));
conf.setInputFormat(SequenceFileInputFormat.class);
conf.setMapperClass(Demux.MapClass.class);
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/DemuxManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/DemuxManager.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/DemuxManager.java
index 8fd155e..9fcb65b 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/DemuxManager.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/DemuxManager.java
@@ -38,8 +38,8 @@ import org.apache.log4j.Logger;
public class DemuxManager implements CHUKWA_CONSTANT {
static Logger log = Logger.getLogger(DemuxManager.class);
- static int globalErrorcounter = 0;
- static Date firstErrorTime = null;
+ int globalErrorcounter = 0;
+ Date firstErrorTime = null;
protected int ERROR_SLEEP_TIME = 60;
protected int NO_DATASINK_SLEEP_TIME = 20;
@@ -144,7 +144,7 @@ public class DemuxManager implements CHUKWA_CONSTANT {
+ nagiosPort + ", reportingHost:" + reportingHost);
- if (nagiosHost == null || nagiosHost.length() == 0 || nagiosPort == 0 || reportingHost.length() == 0 || reportingHost == null) {
+ if (nagiosHost == null || nagiosHost.length() == 0 || nagiosPort == 0 || reportingHost == null || reportingHost.length() == 0) {
sendAlert = false;
log.warn("Alerting is OFF");
}
@@ -159,7 +159,7 @@ public class DemuxManager implements CHUKWA_CONSTANT {
if (maxPermittedErrorCount != -1 && globalErrorcounter >= maxPermittedErrorCount) {
log.warn("==================\nToo many errors (" + globalErrorcounter +
"), Bail out!\n==================");
- System.exit(-1);
+ break;
}
// Check for anomalies
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/HourlyChukwaRecordRolling.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/HourlyChukwaRecordRolling.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/HourlyChukwaRecordRolling.java
index c8f2799..b59b229 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/HourlyChukwaRecordRolling.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/HourlyChukwaRecordRolling.java
@@ -121,7 +121,7 @@ public class HourlyChukwaRecordRolling extends Configured implements Tool {
new HourlyChukwaRecordRolling(), mergeArgs, deleteRawdata);
List<RecordMerger> allMerge = new ArrayList<RecordMerger>();
if (rollInSequence) {
- merge.run();
+ merge.mergeRecords();
} else {
allMerge.add(merge);
merge.start();
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java
index 9685471..df1ff88 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java
@@ -28,7 +28,17 @@ import java.util.List;
import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
import org.apache.hadoop.chukwa.dataloader.DataLoaderFactory;
-import org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT;
+import static org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT.HDFS_DEFAULT_NAME_FIELD;
+import static org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT.CHUKWA_ROOT_DIR_FIELD;
+import static org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT.CHUKWA_POST_PROCESS_DIR_FIELD;
+import static org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT.DEFAULT_CHUKWA_POSTPROCESS_DIR_NAME;
+import static org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT.CHUKWA_ROOT_REPOS_DIR_FIELD;
+import static org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT.DEFAULT_REPOS_DIR_NAME;
+import static org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT.CHUKWA_POSTPROCESS_IN_ERROR_DIR_FIELD;
+import static org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT.DEFAULT_POSTPROCESS_IN_ERROR_DIR_NAME;
+import static org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT.CHUKWA_POSTPROCESS_MAX_ERROR_COUNT_FIELD;
+import static org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT.POST_DEMUX_DATA_LOADER;
+import static org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT.POST_DEMUX_SUCCESS_ACTION;
import org.apache.hadoop.chukwa.util.ExceptionUtil;
import org.apache.hadoop.chukwa.util.HierarchyDataType;
import org.apache.hadoop.chukwa.datatrigger.TriggerAction;
@@ -39,11 +49,11 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.log4j.Logger;
-public class PostProcessorManager implements CHUKWA_CONSTANT{
+public class PostProcessorManager {
static Logger log = Logger.getLogger(PostProcessorManager.class);
- protected static HashMap<String, String> dataSources = new HashMap<String, String>();
- public static int errorCount = 0;
+ protected HashMap<String, String> dataSources = new HashMap<String, String>();
+ protected int errorCount = 0;
protected int ERROR_SLEEP_TIME = 60;
protected ChukwaConfiguration conf = null;
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/RecordMerger.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/RecordMerger.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/RecordMerger.java
index 4b26e45..363016b 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/RecordMerger.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/RecordMerger.java
@@ -43,12 +43,16 @@ public class RecordMerger extends Thread {
this.conf = conf;
this.fs = fs;
this.tool = tool;
- this.mergeArgs = mergeArgs;
+ this.mergeArgs = mergeArgs.clone();
this.deleteRawData = deleteRawData;
}
@Override
public void run() {
+ mergeRecords();
+ }
+
+ void mergeRecords() {
System.out.println("\t Running Merge! : output [" + mergeArgs[1] + "]");
int res;
try {
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/AbstractProcessor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/AbstractProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/AbstractProcessor.java
index f11b727..72574eb 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/AbstractProcessor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/AbstractProcessor.java
@@ -19,7 +19,9 @@
package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+import java.nio.charset.Charset;
import java.util.Calendar;
+
import org.apache.hadoop.chukwa.ChukwaArchiveKey;
import org.apache.hadoop.chukwa.Chunk;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
@@ -121,7 +123,7 @@ public abstract class AbstractProcessor implements MapProcessor {
protected String nextLine() {
String log = new String(bytes, startOffset, (recordOffsets[currentPos]
- - startOffset + 1));
+ - startOffset + 1), Charset.forName("UTF-8"));
startOffset = recordOffsets[currentPos] + 1;
currentPos++;
return RecordConstants.recoverRecordSeparators("\n", log);
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ChunkSaver.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ChunkSaver.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ChunkSaver.java
index 61ba28c..90a561c 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ChunkSaver.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ChunkSaver.java
@@ -19,7 +19,9 @@
package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+import java.nio.charset.Charset;
import java.util.Calendar;
+
import org.apache.hadoop.chukwa.Chunk;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
@@ -60,7 +62,7 @@ public class ChunkSaver {
DataOutputBuffer ob = new DataOutputBuffer(chunk
.getSerializedSizeEstimate());
chunk.write(ob);
- record.add(Record.chunkDataField, new String(ob.getData()));
+ record.add(Record.chunkDataField, new String(ob.getData(), Charset.forName("UTF-8")));
record.add(Record.chunkExceptionField, ExceptionUtil
.getStackTrace(throwable));
output.collect(key, record);
@@ -73,7 +75,7 @@ public class ChunkSaver {
+ " - source:" + chunk.getSource() + " - dataType: "
+ chunk.getDataType() + " - Stream: " + chunk.getStreamName()
+ " - SeqId: " + chunk.getSeqID() + " - Data: "
- + new String(chunk.getData()));
+ + new String(chunk.getData(), Charset.forName("UTF-8")));
} catch (Throwable e1) {
e.printStackTrace();
}
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ClientTraceProcessor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ClientTraceProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ClientTraceProcessor.java
index f249512..afc78ed 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ClientTraceProcessor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ClientTraceProcessor.java
@@ -130,7 +130,7 @@ public class ClientTraceProcessor extends AbstractProcessor {
rec.add(Record.tagsField, chunk.getTags());
rec.add(Record.sourceField, chunk.getSource());
rec.add(Record.applicationField, chunk.getStreamName());
- rec.add("actual_time",(new Long(ms_fullresolution)).toString());
+ rec.add("actual_time",Long.toString(ms_fullresolution));
output.collect(key, rec);
} catch (ParseException e) {
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DatanodeProcessor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DatanodeProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DatanodeProcessor.java
index 4e5765d..85db7fc 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DatanodeProcessor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DatanodeProcessor.java
@@ -118,11 +118,13 @@ public class DatanodeProcessor extends AbstractProcessor {
} else {
timeStamp = Long.parseLong(ttTag);
}
- Iterator<String> keys = obj.keySet().iterator();
+ @SuppressWarnings("unchecked")
+ Iterator<Map.Entry<String, ?>> keys = obj.entrySet().iterator();
while (keys.hasNext()) {
- String key = keys.next();
- Object value = obj.get(key);
+ Map.Entry<String, ?> entry = keys.next();
+ String key = entry.getKey();
+ Object value = entry.getValue();
String valueString = value == null ? "" : value.toString();
// Calculate rate for some of the metrics
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HBaseMasterProcessor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HBaseMasterProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HBaseMasterProcessor.java
index c04e752..924f6aa 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HBaseMasterProcessor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HBaseMasterProcessor.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+import java.nio.charset.Charset;
import java.util.Calendar;
import java.util.HashMap;
import java.util.Iterator;
@@ -67,11 +68,13 @@ public class HBaseMasterProcessor extends AbstractProcessor {
} else {
timeStamp = Long.parseLong(ttTag);
}
- Iterator<String> keys = obj.keySet().iterator();
+ @SuppressWarnings("unchecked")
+ Iterator<Map.Entry<String, ?>> keys = obj.entrySet().iterator();
while (keys.hasNext()) {
- String key = keys.next();
- Object value = obj.get(key);
+ Map.Entry<String, ?> entry = keys.next();
+ String key = entry.getKey();
+ Object value = entry.getValue();
String valueString = value == null ? "" : value.toString();
// Calculate rate for some of the metrics
@@ -88,7 +91,7 @@ public class HBaseMasterProcessor extends AbstractProcessor {
valueString = Long.toString(newValue);
}
- Buffer b = new Buffer(valueString.getBytes());
+ Buffer b = new Buffer(valueString.getBytes(Charset.forName("UTF-8")));
metricsMap.put(key, b);
}
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HBaseRegionServerProcessor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HBaseRegionServerProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HBaseRegionServerProcessor.java
index 8fab057..6ea0169 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HBaseRegionServerProcessor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HBaseRegionServerProcessor.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+import java.nio.charset.Charset;
import java.util.Calendar;
import java.util.HashMap;
import java.util.Iterator;
@@ -60,13 +61,15 @@ public class HBaseRegionServerProcessor extends AbstractProcessor {
} else {
timeStamp = Long.parseLong(ttTag);
}
- Iterator<String> keys = obj.keySet().iterator();
+ @SuppressWarnings("unchecked")
+ Iterator<Map.Entry<String, ?>> keys = obj.entrySet().iterator();
while (keys.hasNext()) {
- String key = keys.next();
- Object value = obj.get(key);
+ Map.Entry<String, ?> entry = keys.next();
+ String key = entry.getKey();
+ Object value = entry.getValue();
String valueString = value == null ? "" : value.toString();
- Buffer b = new Buffer(valueString.getBytes());
+ Buffer b = new Buffer(valueString.getBytes(Charset.forName("UTF-8")));
metricsMap.put(key, b);
}
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopMetricsProcessor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopMetricsProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopMetricsProcessor.java
index f671049..8351e84 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopMetricsProcessor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopMetricsProcessor.java
@@ -25,6 +25,7 @@ import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.Iterator;
+import java.util.Map;
import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Tables;
import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
@@ -91,24 +92,26 @@ public class HadoopMetricsProcessor extends AbstractProcessor {
String contextName = null;
String recordName = null;
- Iterator<String> ki = json.keySet().iterator();
+ Iterator<Map.Entry<String, ?>> ki = json.entrySet().iterator();
while (ki.hasNext()) {
- String keyName = ki.next();
+ Map.Entry<String, ?> entry = ki.next();
+ String keyName = entry.getKey();
+ Object value = entry.getValue();
if (chukwaTimestampField.intern() == keyName.intern()) {
- d = new Date((Long) json.get(keyName));
+ d = new Date((Long) value);
Calendar cal = Calendar.getInstance();
cal.setTimeInMillis(d.getTime());
cal.set(Calendar.SECOND, 0);
cal.set(Calendar.MILLISECOND, 0);
d.setTime(cal.getTimeInMillis());
} else if (contextNameField.intern() == keyName.intern()) {
- contextName = (String) json.get(keyName);
+ contextName = (String) value;
} else if (recordNameField.intern() == keyName.intern()) {
- recordName = (String) json.get(keyName);
- record.add(keyName, json.get(keyName).toString());
+ recordName = (String) value;
+ record.add(keyName, value.toString());
} else {
if(json.get(keyName)!=null) {
- record.add(keyName, json.get(keyName).toString());
+ record.add(keyName, value.toString());
}
}
}
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobConfProcessor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobConfProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobConfProcessor.java
index 7e2e4e2..b910165 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobConfProcessor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobConfProcessor.java
@@ -19,6 +19,8 @@ package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
import java.io.File;
import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.charset.Charset;
import java.util.Calendar;
import java.util.Random;
import java.util.regex.Matcher;
@@ -78,13 +80,14 @@ public class JobConfProcessor extends AbstractProcessor {
= DocumentBuilderFactory.newInstance();
//ignore all comments inside the xml file
docBuilderFactory.setIgnoringComments(true);
+ FileOutputStream out = null;
try {
DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
Document doc = null;
String fileName = "test_"+randomNumber.nextInt();
File tmp = new File(fileName);
- FileOutputStream out = new FileOutputStream(tmp);
- out.write(recordEntry.getBytes());
+ out = new FileOutputStream(tmp);
+ out.write(recordEntry.getBytes(Charset.forName("UTF-8")));
out.close();
doc = builder.parse(fileName);
Element root = doc.getDocumentElement();
@@ -139,10 +142,15 @@ public class JobConfProcessor extends AbstractProcessor {
buildGenericRecord(jobConfRecord, null, time, jobConfData);
output.collect(key, jobConfRecord);
- tmp.delete();
- } catch(Exception e) {
- e.printStackTrace();
- throw e;
+ if(!tmp.delete()) {
+ log.warn(tmp.getAbsolutePath() + " cannot be deleted.");
+ }
+ } catch(IOException e) {
+ if(out != null) {
+ out.close();
+ }
+ e.printStackTrace();
+ throw e;
}
}
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobLogHistoryProcessor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobLogHistoryProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobLogHistoryProcessor.java
index 5a2a851..fdd51f2 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobLogHistoryProcessor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobLogHistoryProcessor.java
@@ -22,6 +22,7 @@ package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.Map.Entry;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -38,13 +39,12 @@ public class JobLogHistoryProcessor extends AbstractProcessor {
static Logger log = Logger.getLogger(JobLogHistoryProcessor.class);
private static final String recordType = "JobLogHistory";
- private static String internalRegex = null;
- private static Pattern ip = null;
+ private static final String internalRegex = "(.*?)=\"(.*?)\"(.*)([\\n])?";
+ private Pattern ip = null;
private Matcher internalMatcher = null;
public JobLogHistoryProcessor() {
- internalRegex = "(.*?)=\"(.*?)\"(.*)([\\n])?";
ip = Pattern.compile(internalRegex);
internalMatcher = ip.matcher("-");
}
@@ -331,10 +331,8 @@ public class JobLogHistoryProcessor extends AbstractProcessor {
record.setTime(Long.parseLong(keys.get("FINISH_TIME")));
}
- Iterator<String> it = keys.keySet().iterator();
- while (it.hasNext()) {
- String field = it.next();
- record.add(field, keys.get(field));
+ for(Entry<String, String> entry : keys.entrySet()) {
+ record.add(entry.getKey(), entry.getValue());
}
output.collect(key, record);
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobTrackerProcessor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobTrackerProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobTrackerProcessor.java
index c2f7b52..d42c329 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobTrackerProcessor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobTrackerProcessor.java
@@ -22,6 +22,8 @@ import java.util.Calendar;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.ConcurrentHashMap;
@@ -113,12 +115,9 @@ public class JobTrackerProcessor extends AbstractProcessor {
} else {
timeStamp = Long.parseLong(ttTag);
}
- Iterator<String> keys = obj.keySet().iterator();
-
- while (keys.hasNext()) {
- String key = keys.next();
- Object value = obj.get(key);
- String valueString = value == null ? "" : value.toString();
+ for(Entry<String, Object> entry : (Set<Entry<String, Object>>) obj.entrySet()) {
+ String key = entry.getKey();
+ String valueString = entry.getValue().toString();
// Calculate rate for some of the metrics
if (rateMap.containsKey(key)) {
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4JMetricsContextProcessor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4JMetricsContextProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4JMetricsContextProcessor.java
index 79291a1..3962628 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4JMetricsContextProcessor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4JMetricsContextProcessor.java
@@ -19,13 +19,15 @@
package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.log4j.Logger;
-
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
@@ -67,10 +69,9 @@ public class Log4JMetricsContextProcessor extends AbstractProcessor {
recordType += "_" + recordName;
}
- Iterator<String> ki = json.keySet().iterator();
- while (ki.hasNext()) {
- String key = ki.next();
- String value = String.valueOf(json.get(key));
+ for(Entry<String, Object> entry : (Set<Map.Entry>) json.entrySet()) {
+ String key = entry.getKey();
+ String value = String.valueOf(entry.getValue());
if(value != null) {
chukwaRecord.add(key, value);
}
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4jJobHistoryProcessor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4jJobHistoryProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4jJobHistoryProcessor.java
index 272980a..2cb0980 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4jJobHistoryProcessor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4jJobHistoryProcessor.java
@@ -34,13 +34,12 @@ public class Log4jJobHistoryProcessor extends AbstractProcessor {
static Logger log = Logger.getLogger(Log4jJobHistoryProcessor.class);
private static final String recordType = "JobLogHistory";
- private static String internalRegex = null;
- private static Pattern ip = null;
+ private static String internalRegex = "(.*?)=\"(.*?)\"(.*)([\\n])?";
+ private Pattern ip = null;
private Matcher internalMatcher = null;
public Log4jJobHistoryProcessor() {
- internalRegex = "(.*?)=\"(.*?)\"(.*)([\\n])?";
ip = Pattern.compile(internalRegex);
internalMatcher = ip.matcher("-");
}
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/LogEntry.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/LogEntry.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/LogEntry.java
index 400bd78..5b75939 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/LogEntry.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/LogEntry.java
@@ -22,7 +22,7 @@ import java.text.SimpleDateFormat;
import java.util.Date;
public class LogEntry {
- private final static SimpleDateFormat sdf = new SimpleDateFormat(
+ private SimpleDateFormat sdf = new SimpleDateFormat(
"yyyy-MM-dd HH:mm");
private Date date;
@@ -43,11 +43,11 @@ public class LogEntry {
}
public Date getDate() {
- return date;
+ return (Date) date.clone();
}
public void setDate(Date date) {
- this.date = date;
+ this.date = (Date) date.clone();
}
public String getLogLevel() {
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/NamenodeProcessor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/NamenodeProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/NamenodeProcessor.java
index 1e6e9d7..075ab5c 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/NamenodeProcessor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/NamenodeProcessor.java
@@ -131,11 +131,13 @@ public class NamenodeProcessor extends AbstractProcessor {
} else {
timeStamp = Long.parseLong(ttTag);
}
- Iterator<String> keys = obj.keySet().iterator();
+ @SuppressWarnings("unchecked")
+ Iterator<Map.Entry<String, ?>> keys = obj.entrySet().iterator();
while (keys.hasNext()) {
- String key = keys.next();
- Object value = obj.get(key);
+ Map.Entry<String, ?> entry = keys.next();
+ String key = entry.getKey();
+ Object value = entry.getValue();
String valueString = (value == null) ? "" : value.toString();
// These metrics are string types with JSON structure. So we parse them
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/SysLog.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/SysLog.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/SysLog.java
index 4c643a2..d38673c 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/SysLog.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/SysLog.java
@@ -49,13 +49,6 @@ public class SysLog extends AbstractProcessor {
throws Throwable {
try {
String dStr = recordEntry.substring(0, 15);
- int start = 15;
- int idx = recordEntry.indexOf(' ', start);
- start = idx + 1;
- idx = recordEntry.indexOf(' ', start);
- String body = recordEntry.substring(idx + 1);
- body = body.replaceAll("\n", "");
-
Calendar convertDate = Calendar.getInstance();
Date d = sdf.parse(dStr);
int year = convertDate.get(Calendar.YEAR);
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/SystemMetrics.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/SystemMetrics.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/SystemMetrics.java
index e293543..3fa1816 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/SystemMetrics.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/SystemMetrics.java
@@ -24,6 +24,7 @@ package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
import java.util.Calendar;
import java.util.Iterator;
+import java.util.Map;
import java.util.TimeZone;
import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
@@ -69,14 +70,17 @@ public class SystemMetrics extends AbstractProcessor {
continue;
}
actualSize++;
- Iterator<String> keys = cpu.keySet().iterator();
combined = combined + Double.parseDouble(cpu.get("combined").toString());
user = user + Double.parseDouble(cpu.get("user").toString());
sys = sys + Double.parseDouble(cpu.get("sys").toString());
idle = idle + Double.parseDouble(cpu.get("idle").toString());
+ @SuppressWarnings("unchecked")
+ Iterator<Map.Entry<String, ?>> keys = cpu.entrySet().iterator();
while(keys.hasNext()) {
- String key = keys.next();
- record.add(key + "." + i, cpu.get(key).toString());
+ Map.Entry<String, ?> entry = keys.next();
+ String key = entry.getKey();
+ Object value = entry.getValue();
+ record.add(key + "." + i, value.toString());
}
}
combined = combined / actualSize;
@@ -101,20 +105,26 @@ public class SystemMetrics extends AbstractProcessor {
record = new ChukwaRecord();
JSONObject memory = (JSONObject) json.get("memory");
- Iterator<String> memKeys = memory.keySet().iterator();
+ @SuppressWarnings("unchecked")
+ Iterator<Map.Entry<String, ?>> memKeys = memory.entrySet().iterator();
while(memKeys.hasNext()) {
- String key = memKeys.next();
- record.add(key, memory.get(key).toString());
+ Map.Entry<String, ?> entry = memKeys.next();
+ String key = entry.getKey();
+ Object value = entry.getValue();
+ record.add(key, value.toString());
}
buildGenericRecord(record, null, cal.getTimeInMillis(), "memory");
output.collect(key, record);
record = new ChukwaRecord();
JSONObject swap = (JSONObject) json.get("swap");
- Iterator<String> swapKeys = swap.keySet().iterator();
+ @SuppressWarnings("unchecked")
+ Iterator<Map.Entry<String, ?>> swapKeys = swap.entrySet().iterator();
while(swapKeys.hasNext()) {
- String key = swapKeys.next();
- record.add(key, swap.get(key).toString());
+ Map.Entry<String, ?> entry = swapKeys.next();
+ String key = entry.getKey();
+ Object value = entry.getValue();
+ record.add(key, value.toString());
}
buildGenericRecord(record, null, cal.getTimeInMillis(), "swap");
output.collect(key, record);
@@ -131,25 +141,28 @@ public class SystemMetrics extends AbstractProcessor {
JSONArray netList = (JSONArray) json.get("network");
for(int i = 0;i < netList.size(); i++) {
JSONObject netIf = (JSONObject) netList.get(i);
- Iterator<String> keys = netIf.keySet().iterator();
+ @SuppressWarnings("unchecked")
+ Iterator<Map.Entry<String, ?>> keys = netIf.entrySet().iterator();
while(keys.hasNext()) {
- String key = keys.next();
- record.add(key + "." + i, netIf.get(key).toString());
+ Map.Entry<String, ?> entry = keys.next();
+ String key = entry.getKey();
+ Object value = entry.getValue();
+ record.add(key + "." + i, value.toString());
if(i!=0) {
if(key.equals("RxBytes")) {
- rxBytes = rxBytes + (Long) netIf.get(key);
+ rxBytes = rxBytes + (Long) value;
} else if(key.equals("RxDropped")) {
- rxDropped = rxDropped + (Long) netIf.get(key);
+ rxDropped = rxDropped + (Long) value;
} else if(key.equals("RxErrors")) {
- rxErrors = rxErrors + (Long) netIf.get(key);
+ rxErrors = rxErrors + (Long) value;
} else if(key.equals("RxPackets")) {
- rxPackets = rxPackets + (Long) netIf.get(key);
+ rxPackets = rxPackets + (Long) value;
} else if(key.equals("TxBytes")) {
- txBytes = txBytes + (Long) netIf.get(key);
+ txBytes = txBytes + (Long) value;
} else if(key.equals("TxCollisions")) {
- txCollisions = txCollisions + (Long) netIf.get(key);
+ txCollisions = txCollisions + (Long) value;
} else if(key.equals("TxErrors")) {
- txErrors = txErrors + (Long) netIf.get(key);
+ txErrors = txErrors + (Long) value;
} else if(key.equals("TxPackets")) {
txPackets = txPackets + (Long) netIf.get(key);
}
@@ -177,22 +190,25 @@ public class SystemMetrics extends AbstractProcessor {
JSONArray diskList = (JSONArray) json.get("disk");
for(int i = 0;i < diskList.size(); i++) {
JSONObject disk = (JSONObject) diskList.get(i);
- Iterator<String> keys = disk.keySet().iterator();
+ @SuppressWarnings("unchecked")
+ Iterator<Map.Entry<String, ?>> keys = disk.entrySet().iterator();
while(keys.hasNext()) {
- String key = keys.next();
- record.add(key + "." + i, disk.get(key).toString());
+ Map.Entry<String, ?> entry = keys.next();
+ String key = entry.getKey();
+ Object value = entry.getValue();
+ record.add(key + "." + i, value.toString());
if(key.equals("ReadBytes")) {
- readBytes = readBytes + (Long) disk.get("ReadBytes");
+ readBytes = readBytes + (Long) value;
} else if(key.equals("Reads")) {
- reads = reads + (Long) disk.get("Reads");
+ reads = reads + (Long) value;
} else if(key.equals("WriteBytes")) {
- writeBytes = writeBytes + (Long) disk.get("WriteBytes");
+ writeBytes = writeBytes + (Long) value;
} else if(key.equals("Writes")) {
- writes = writes + (Long) disk.get("Writes");
+ writes = writes + (Long) value;
} else if(key.equals("Total")) {
- total = total + (Long) disk.get("Total");
+ total = total + (Long) value;
} else if(key.equals("Used")) {
- used = used + (Long) disk.get("Used");
+ used = used + (Long) value;
}
}
}
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ZookeeperProcessor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ZookeeperProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ZookeeperProcessor.java
index 417fbb5..fe050ed 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ZookeeperProcessor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ZookeeperProcessor.java
@@ -77,11 +77,13 @@ public class ZookeeperProcessor extends AbstractProcessor {
} else {
timeStamp = Long.parseLong(ttTag);
}
- Iterator<String> keys = ((JSONObject) obj).keySet().iterator();
+ @SuppressWarnings("unchecked")
+ Iterator<Map.Entry<String, ?>> keys = ((JSONObject) obj).entrySet().iterator();
while (keys.hasNext()) {
- String key = keys.next();
- Object value = obj.get(key);
+ Map.Entry<String, ?> entry = keys.next();
+ String key = entry.getKey();
+ Object value = entry.getValue();
String valueString = value == null ? "" : value.toString();
if (metricsMap.containsKey(key)) {
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ClientTrace.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ClientTrace.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ClientTrace.java
index 4002c6c..5c658dc 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ClientTrace.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ClientTrace.java
@@ -48,7 +48,7 @@ public class ClientTrace implements ReduceProcessor {
while (values.hasNext()) {
/* aggregate bytes for current key */
rec = values.next();
- bytes += Long.valueOf(rec.getValue("bytes"));
+ bytes += Long.parseLong(rec.getValue("bytes"));
/* output raw values to different data type for uses which
* require detailed per-operation data */
@@ -70,7 +70,7 @@ public class ClientTrace implements ReduceProcessor {
String[] k = key.getKey().split("/");
emit.add(k[1] + "_" + k[2], String.valueOf(bytes));
- emit.setTime(Long.valueOf(k[3]));
+ emit.setTime(Long.parseLong(k[3]));
output.collect(key, emit);
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/MRJobReduceProcessor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/MRJobReduceProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/MRJobReduceProcessor.java
index 5e8814c..c5050f2 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/MRJobReduceProcessor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/MRJobReduceProcessor.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.chukwa.extraction.demux.processor.reducer;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.Map;
+
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
import org.apache.hadoop.chukwa.extraction.engine.Record;
@@ -73,10 +75,12 @@ public class MRJobReduceProcessor implements ReduceProcessor {
newRecord.add(Record.tagsField, record.getValue(Record.tagsField));
newRecord.setTime(initTime);
newRecord.add(Record.tagsField, record.getValue(Record.tagsField));
- Iterator<String> it = data.keySet().iterator();
+ Iterator<Map.Entry<String, String>> it = data.entrySet().iterator();
while (it.hasNext()) {
- String field = it.next();
- newRecord.add(field, data.get(field));
+ Map.Entry<String, ?> entry = it.next();
+ String field = entry.getKey();
+ String value = entry.getValue().toString();
+ newRecord.add(field, value);
}
output.collect(newKey, newRecord);
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecord.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecord.java b/src/main/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecord.java
index 4fdb365..a884cc1 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecord.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecord.java
@@ -19,10 +19,12 @@
package org.apache.hadoop.chukwa.extraction.engine;
+import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
+
import org.apache.hadoop.record.Buffer;
public class ChukwaRecord extends ChukwaRecordJT implements Record {
@@ -32,10 +34,10 @@ public class ChukwaRecord extends ChukwaRecordJT implements Record {
public void add(String key, String value) {
synchronized (this) {
if (this.mapFields == null) {
- this.mapFields = new TreeMap<String, org.apache.hadoop.record.Buffer>();
+ this.mapFields = new TreeMap<String, Buffer>();
}
}
- this.mapFields.put(key, new Buffer(value.getBytes()));
+ this.mapFields.put(key, new Buffer(value.getBytes(Charset.forName("UTF-8"))));
}
public String[] getFields() {
@@ -44,7 +46,7 @@ public class ChukwaRecord extends ChukwaRecordJT implements Record {
public String getValue(String field) {
if (this.mapFields.containsKey(field)) {
- return new String(this.mapFields.get(field).get());
+ return new String(this.mapFields.get(field).get(), Charset.forName("UTF-8"));
} else {
return null;
}
@@ -77,7 +79,7 @@ public class ChukwaRecord extends ChukwaRecordJT implements Record {
while (it.hasNext()) {
entry = it.next();
key = entry.getKey().intern();
- val = new String(entry.getValue().get());
+ val = new String(entry.getValue().get(), Charset.forName("UTF-8"));
if (key.intern() == Record.bodyField.intern()) {
hasBody = true;
bodyVal = val;
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecordJT.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecordJT.java b/src/main/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecordJT.java
index b2660a2..04cca36 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecordJT.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecordJT.java
@@ -19,8 +19,11 @@
// File generated by hadoop record compiler. Do not edit.
package org.apache.hadoop.chukwa.extraction.engine;
+import java.io.Serializable;
-public class ChukwaRecordJT extends org.apache.hadoop.record.Record {
+
+public class ChukwaRecordJT extends org.apache.hadoop.record.Record implements Serializable {
+ private static final long serialVersionUID = 15015L;
private static final org.apache.hadoop.record.meta.RecordTypeInfo _rio_recTypeInfo;
private static org.apache.hadoop.record.meta.RecordTypeInfo _rio_rtiFilter;
private static int[] _rio_rtiFilterFields;
@@ -236,6 +239,7 @@ public class ChukwaRecordJT extends org.apache.hadoop.record.Record {
}
public Object clone() throws CloneNotSupportedException {
+ super.clone();
ChukwaRecordJT _rio_other = new ChukwaRecordJT();
_rio_other.time = this.time;
_rio_other.mapFields = (java.util.TreeMap<String, org.apache.hadoop.record.Buffer>) this.mapFields
@@ -258,7 +262,7 @@ public class ChukwaRecordJT extends org.apache.hadoop.record.Record {
}
public static class Comparator extends
- org.apache.hadoop.record.RecordComparator {
+ org.apache.hadoop.record.RecordComparator implements Serializable {
public Comparator() {
super(ChukwaRecordJT.class);
}
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecordKey.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecordKey.java b/src/main/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecordKey.java
index 7bc6718..0e602d7 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecordKey.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecordKey.java
@@ -178,6 +178,7 @@ public class ChukwaRecordKey extends org.apache.hadoop.record.Record {
}
public Object clone() throws CloneNotSupportedException {
+ super.clone();
ChukwaRecordKey _rio_other = new ChukwaRecordKey();
_rio_other.reduceType = this.reduceType;
_rio_other.key = this.key;
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/engine/Token.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/engine/Token.java b/src/main/java/org/apache/hadoop/chukwa/extraction/engine/Token.java
index 5b71a61..eb14414 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/engine/Token.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/engine/Token.java
@@ -21,4 +21,8 @@ package org.apache.hadoop.chukwa.extraction.engine;
public class Token {
public String key = null;
public boolean hasMore = false;
+
+ public boolean getMore() {
+ return hasMore;
+ }
}
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/DsDirectory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/DsDirectory.java b/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/DsDirectory.java
index 68dbb2c..dc0c576 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/DsDirectory.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/DsDirectory.java
@@ -37,7 +37,7 @@ public class DsDirectory {
private DataConfig dataConfig = null;
private static FileSystem fs = null;
- private static Configuration conf = null;
+ private Configuration conf = null;
private DsDirectory() {
dataConfig = new DataConfig();
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/database/DatabaseDS.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/database/DatabaseDS.java b/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/database/DatabaseDS.java
index c2602b4..bc0f83d 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/database/DatabaseDS.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/database/DatabaseDS.java
@@ -45,6 +45,9 @@ import org.apache.commons.logging.LogFactory;
public class DatabaseDS implements DataSource {
private static final Log log = LogFactory.getLog(DatabaseDS.class);
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value =
+ "SQL_NONCONSTANT_STRING_PASSED_TO_EXECUTE",
+ justification = "Dynamic based upon tables in the database")
public SearchResult search(SearchResult result, String cluster,
String dataSource, long t0, long t1, String filter, Token token)
throws DataSourceException {
@@ -60,8 +63,6 @@ public class DatabaseDS implements DataSource {
timeField = "LAUNCH_TIME";
} else if (dataSource.equalsIgnoreCase("HodJob")) {
timeField = "StartTime";
- } else if (dataSource.equalsIgnoreCase("QueueInfo")) {
- timeField = "timestamp";
} else {
timeField = "timestamp";
}
@@ -88,13 +89,16 @@ public class DatabaseDS implements DataSource {
int col = rmeta.getColumnCount();
while (rs.next()) {
ChukwaRecord event = new ChukwaRecord();
- String cell = "";
+ StringBuilder cell = new StringBuilder();;
long timestamp = 0;
for (int i = 1; i < col; i++) {
String value = rs.getString(i);
if (value != null) {
- cell = cell + " " + rmeta.getColumnName(i) + ":" + value;
+ cell.append(" ");
+ cell.append(rmeta.getColumnName(i));
+ cell.append(":");
+ cell.append(value);
}
if (rmeta.getColumnName(i).equals(timeField)) {
timestamp = rs.getLong(i);
@@ -111,7 +115,7 @@ public class DatabaseDS implements DataSource {
continue;
}
- event.add(Record.bodyField, cell);
+ event.add(Record.bodyField, cell.toString());
event.add(Record.sourceField, cluster + "." + dataSource);
if (records.containsKey(timestamp)) {
records.get(timestamp).add(event);
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaDSInternalResult.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaDSInternalResult.java b/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaDSInternalResult.java
index bb1797b..f197420 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaDSInternalResult.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaDSInternalResult.java
@@ -34,4 +34,12 @@ public class ChukwaDSInternalResult {
String fileName = null;
ChukwaRecordKey key = null;
+
+ public ChukwaRecordKey getKey() {
+ return key;
+ }
+
+ protected void setKey(ChukwaRecordKey key) {
+ this.key = key;
+ }
}
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaFileParser.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaFileParser.java b/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaFileParser.java
index dc23ef6..799390b 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaFileParser.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaFileParser.java
@@ -131,12 +131,14 @@ public class ChukwaFileParser {
}
} while (line != null);
- } catch (Exception e) {
+ } catch (IOException e) {
e.printStackTrace();
} finally {
System.out.println("File: " + fileName + " Line count: " + lineCount);
try {
- dataIS.close();
+ if(dataIS != null) {
+ dataIS.close();
+ }
} catch (IOException e) {
}
}
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaRecordDataSource.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaRecordDataSource.java b/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaRecordDataSource.java
index 7c9e02c..93fdd2a 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaRecordDataSource.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaRecordDataSource.java
@@ -26,7 +26,10 @@ import java.util.Date;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
import java.util.TreeMap;
+
import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
@@ -150,7 +153,7 @@ public class ChukwaRecordDataSource implements DataSource {
{
log.debug("check for hours");
for (int hour = 0; hour < 24; hour++) {
- if (workingDay == res.day && hour < workingHour) {
+ if (workingDay.equals(res.day) && hour < workingHour) {
continue;
}
log.debug(" Hour? -->" + filePath + dataSource + "/"
@@ -349,7 +352,7 @@ public class ChukwaRecordDataSource implements DataSource {
}
}
- } catch (Exception e) {
+ } catch (IOException e) {
e.printStackTrace();
} finally {
try {
@@ -375,7 +378,10 @@ public class ChukwaRecordDataSource implements DataSource {
contains = fs.exists(new Path(rootFolder + dataSource + "/" + workingDay
+ "/" + workingHour + "/rotateDone"));
break;
-
+ default:
+ contains = fs.exists(new Path(rootFolder + dataSource + "/" + workingDay
+ + "/rotateDone"));
+ break;
}
return contains;
}
@@ -400,7 +406,10 @@ public class ChukwaRecordDataSource implements DataSource {
contains = fs.exists(new Path(rootFolder + dataSource + "/" + workingDay
+ "/" + workingHour + "/" + raw));
break;
-
+ default:
+ contains = fs
+ .exists(new Path(rootFolder + dataSource + "/" + workingDay));
+ break;
}
return contains;
}
@@ -440,6 +449,10 @@ public class ChukwaRecordDataSource implements DataSource {
+ raws[rawIndex] + "/" + dataSource + "_" + day + "_" + hour + "_"
+ raws[rawIndex] + "." + spill + ".evt";
break;
+ default:
+ fileName = rootFolder + "/" + dataSource + "/" + day + "/" + dataSource
+ + "_" + day + "." + spill + ".evt";
+ break;
}
log.debug("buildFileName :" + fileName);
return fileName;
@@ -473,12 +486,10 @@ public class ChukwaRecordDataSource implements DataSource {
ds.search(result, cluster, dataSource, t0, t1, filter, token);
TreeMap<Long, List<Record>> records = result.getRecords();
- Iterator<Long> it = records.keySet().iterator();
-
- while (it.hasNext()) {
- long ts = it.next();
+ for(Entry<Long, List<Record>> entry : records.entrySet()) {
+ long ts = entry.getKey();
System.out.println("\n\nTimestamp: " + new Date(ts));
- List<Record> list = records.get(ts);
+ List<Record> list = entry.getValue();
for (int i = 0; i < list.size(); i++) {
System.out.println(list.get(i));
}
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaSequenceFileParser.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaSequenceFileParser.java b/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaSequenceFileParser.java
index dbaadc2..59b8dcd 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaSequenceFileParser.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaSequenceFileParser.java
@@ -118,7 +118,7 @@ public class ChukwaSequenceFileParser {
}
}
- } catch (Exception e) {
+ } catch (IOException e) {
e.printStackTrace();
} finally {
System.out.println("File: " + fileName + " Line count: " + lineCount);
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/hicc/JSONLoader.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/hicc/JSONLoader.java b/src/main/java/org/apache/hadoop/chukwa/hicc/JSONLoader.java
index f721978..48c578a 100644
--- a/src/main/java/org/apache/hadoop/chukwa/hicc/JSONLoader.java
+++ b/src/main/java/org/apache/hadoop/chukwa/hicc/JSONLoader.java
@@ -20,13 +20,13 @@ package org.apache.hadoop.chukwa.hicc;
import java.net.*;
+import java.nio.charset.Charset;
import java.text.ParseException;
import java.io.*;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
-
import org.apache.log4j.Logger;
import org.apache.hadoop.chukwa.util.ExceptionUtil;
@@ -43,7 +43,7 @@ public class JSONLoader {
// FileReader always assumes default encoding is OK!
URL yahoo = new URL(source);
BufferedReader in = new BufferedReader(new InputStreamReader(yahoo
- .openStream()));
+ .openStream(), Charset.forName("UTF-8")));
String inputLine;
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/hicc/OfflineTimeHandler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/hicc/OfflineTimeHandler.java b/src/main/java/org/apache/hadoop/chukwa/hicc/OfflineTimeHandler.java
index b3e7bf5..1e98989 100644
--- a/src/main/java/org/apache/hadoop/chukwa/hicc/OfflineTimeHandler.java
+++ b/src/main/java/org/apache/hadoop/chukwa/hicc/OfflineTimeHandler.java
@@ -52,10 +52,8 @@ public class OfflineTimeHandler {
public void init(HashMap<String, String> map) {
Calendar now = Calendar.getInstance();
- if (map == null || (map != null
- && map.get("time_type") == null
- && map.get("time_type") == null
- && map.get("period") == null)) {
+ if (map == null ||
+ (map.get("time_type") == null && map.get("period") == null)) {
end = now.getTimeInMillis();
start = end - 60 * 60 * 1000;
} else if (map.get("period") != null
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/hicc/Views.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/hicc/Views.java b/src/main/java/org/apache/hadoop/chukwa/hicc/Views.java
index dbb6707..19e537d 100644
--- a/src/main/java/org/apache/hadoop/chukwa/hicc/Views.java
+++ b/src/main/java/org/apache/hadoop/chukwa/hicc/Views.java
@@ -20,12 +20,12 @@ package org.apache.hadoop.chukwa.hicc;
import java.io.*;
+import java.nio.charset.Charset;
import java.util.*;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.chukwa.util.ExceptionUtil;
@@ -43,7 +43,7 @@ public class Views {
try {
// use buffering, reading one line at a time
// FileReader always assumes default encoding is OK!
- BufferedReader input = new BufferedReader(new FileReader(aFile));
+ BufferedReader input = new BufferedReader(new InputStreamReader(new FileInputStream(aFile.getAbsolutePath()), Charset.forName("UTF-8")));
try {
String line = null; // not declared within while loop
/*