You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by as...@apache.org on 2012/08/01 03:53:22 UTC
svn commit: r1367843 - in /incubator/chukwa/trunk: ./
src/main/java/org/apache/hadoop/chukwa/extraction/
src/main/java/org/apache/hadoop/chukwa/extraction/demux/
Author: asrabkin
Date: Wed Aug 1 01:53:21 2012
New Revision: 1367843
URL: http://svn.apache.org/viewvc?rev=1367843&view=rev
Log:
CHUKWA-648. Make Chukwa Reduce Type to support hierarchy format. Contributed by Jie Huang.
Modified:
incubator/chukwa/trunk/CHANGES.txt
incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/CHUKWA_CONSTANT.java
incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/ChukwaRecordOutputFormat.java
incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/DailyChukwaRecordRolling.java
incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/HourlyChukwaRecordRolling.java
incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/MoveToRepository.java
incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java
Modified: incubator/chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/CHANGES.txt?rev=1367843&r1=1367842&r2=1367843&view=diff
==============================================================================
--- incubator/chukwa/trunk/CHANGES.txt (original)
+++ incubator/chukwa/trunk/CHANGES.txt Wed Aug 1 01:53:21 2012
@@ -6,6 +6,8 @@ Trunk (unreleased changes)
IMPROVEMENTS
+ CHUKWA-648. Make Chukwa Reduce Type to support hierarchy format. (Jie Huang via asrabkin)
+
CHUKWA-650. Re-configure Demux ReduceNumber without re-starting the DemuxManager service. (Jie Huang via asrabkin)
CHUKWA-649. Handle pid-file missing case while stoping chukwa processes. (Jie Huang via asrabkin)
Modified: incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/CHUKWA_CONSTANT.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/CHUKWA_CONSTANT.java?rev=1367843&r1=1367842&r2=1367843&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/CHUKWA_CONSTANT.java (original)
+++ incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/CHUKWA_CONSTANT.java Wed Aug 1 01:53:21 2012
@@ -65,5 +65,9 @@ public interface CHUKWA_CONSTANT {
public static final String POST_DEMUX_DATA_LOADER = "chukwa.post.demux.data.loader";
public static final String INCLUDE_KEY_IN_PARTITIONER = "_";
+
+ //CHUKWA-648: Make Chukwa Reduce Type to support hierarchy format
+ //To support Hierarchy datatype
+ public static final String HIERARCHY_CONNECTOR = "-";
public static final String POST_DEMUX_SUCCESS_ACTION = "chukwa.post.demux.success.action";
}
Modified: incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/ChukwaRecordOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/ChukwaRecordOutputFormat.java?rev=1367843&r1=1367842&r2=1367843&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/ChukwaRecordOutputFormat.java (original)
+++ incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/ChukwaRecordOutputFormat.java Wed Aug 1 01:53:21 2012
@@ -23,6 +23,7 @@ import org.apache.hadoop.chukwa.extracti
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
import org.apache.hadoop.chukwa.extraction.engine.RecordUtil;
+import org.apache.hadoop.chukwa.util.HierarchyDataType;
import org.apache.hadoop.mapred.lib.MultipleSequenceFileOutputFormat;
import org.apache.log4j.Logger;
@@ -33,8 +34,14 @@ public class ChukwaRecordOutputFormat ex
@Override
protected String generateFileNameForKeyValue(ChukwaRecordKey key,
ChukwaRecord record, String name) {
+ //CHUKWA-648: Make Chukwa Reduce Type to support hierarchy format
+ //Allow the user to define hierarchy data-type separated by slash mark
+ //Transform the reduceType from
+ // "datatypeLevel1-datatypeLevel2-datatypeLevel3" to
+ // "datatypeLevel1/datatypeLevel2/datatypeLevel3"
String output = RecordUtil.getClusterName(record) + "/"
- + key.getReduceType() + "/" + key.getReduceType()
+ + key.getReduceType() + "/"
+ + HierarchyDataType.getHierarchyDataTypeDirectory(key.getReduceType())
+ Util.generateTimeOutput(record.getTime());
// {log.info("ChukwaOutputFormat.fileName: [" + output +"]");}
Modified: incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/DailyChukwaRecordRolling.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/DailyChukwaRecordRolling.java?rev=1367843&r1=1367842&r2=1367843&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/DailyChukwaRecordRolling.java (original)
+++ incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/DailyChukwaRecordRolling.java Wed Aug 1 01:53:21 2012
@@ -30,6 +30,7 @@ import org.apache.hadoop.chukwa.extracti
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
import org.apache.hadoop.chukwa.util.DaemonWatcher;
import org.apache.hadoop.chukwa.util.ExceptionUtil;
+import org.apache.hadoop.chukwa.util.HierarchyDataType;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -114,82 +115,87 @@ public class DailyChukwaRecordRolling ex
+ workingDay + "/" + cluster);
FileStatus[] dataSourcesFS = fs.listStatus(dataSourceClusterHourPaths);
for (FileStatus dataSourceFS : dataSourcesFS) {
- String dataSource = dataSourceFS.getPath().getName();
- // Repo path = reposRootDirectory/<cluster>/<day>/*/*.evt
-
-
- // put the rotate flag
- fs.mkdirs(new Path(chukwaMainRepository + "/" + cluster + "/"
- + dataSource + "/" + workingDay + "/rotateDone"));
-
- if (hourlyRolling(chukwaMainRepository + "/" + cluster + "/" + dataSource + "/" + workingDay) == false) {
- log.warn("Skipping this directory, hourly not done. " + chukwaMainRepository + "/" + cluster + "/"
- + dataSource + "/" + workingDay );
- alldone = false;
- continue;
- }
-
- log.info("Running Daily rolling for " + chukwaMainRepository + "/" + cluster + "/"
- + dataSource + "/" + workingDay + "/rotateDone");
-
- // rotate
- // Merge
- String[] mergeArgs = new String[5];
- // input
- mergeArgs[0] = chukwaMainRepository + "/" + cluster + "/" + dataSource
- + "/" + workingDay + "/[0-9]*/*.evt";
- // temp dir
- mergeArgs[1] = tempDir + "/" + cluster + "/" + dataSource + "/"
- + workingDay + "_" + System.currentTimeMillis();
- // final output dir
- mergeArgs[2] = chukwaMainRepository + "/" + cluster + "/" + dataSource
- + "/" + workingDay;
- // final output fileName
- mergeArgs[3] = dataSource + "_DailyDone_" + workingDay;
- // delete rolling directory
- mergeArgs[4] = rollingFolder + "/daily/" + workingDay + "/" + cluster
- + "/" + dataSource;
-
- log.info("DailyChukwaRecordRolling 0: " + mergeArgs[0]);
- log.info("DailyChukwaRecordRolling 1: " + mergeArgs[1]);
- log.info("DailyChukwaRecordRolling 2: " + mergeArgs[2]);
- log.info("DailyChukwaRecordRolling 3: " + mergeArgs[3]);
- log.info("DailyChukwaRecordRolling 4: " + mergeArgs[4]);
-
- RecordMerger merge = new RecordMerger(conf, fs,
- new DailyChukwaRecordRolling(), mergeArgs, deleteRawdata);
- List<RecordMerger> allMerge = new ArrayList<RecordMerger>();
- if (rollInSequence) {
- merge.run();
- } else {
- allMerge.add(merge);
- merge.start();
- }
+ //CHUKWA-648: Make Chukwa Reduce Type to support hierarchy format
+ for (FileStatus dataSourcePath : HierarchyDataType.globStatus(fs,
+ dataSourceFS.getPath(), true)) {
+ String dataSource = HierarchyDataType.getDataType(
+ dataSourcePath.getPath(),
+ fs.getFileStatus(dataSourceClusterHourPaths).getPath());
+ // Repo path = reposRootDirectory/<cluster>/<day>/*/*.evt
+
+
+ // put the rotate flag
+ fs.mkdirs(new Path(chukwaMainRepository + "/" + cluster + "/"
+ + dataSource + "/" + workingDay + "/rotateDone"));
+
+ if (hourlyRolling(chukwaMainRepository + "/" + cluster + "/" + dataSource + "/" + workingDay) == false) {
+ log.warn("Skipping this directory, hourly not done. " + chukwaMainRepository + "/" + cluster + "/"
+ + dataSource + "/" + workingDay );
+ alldone = false;
+ continue;
+ }
+
+ log.info("Running Daily rolling for " + chukwaMainRepository + "/" + cluster + "/"
+ + dataSource + "/" + workingDay + "/rotateDone");
+
+ // rotate
+ // Merge
+ String[] mergeArgs = new String[5];
+ // input
+ mergeArgs[0] = chukwaMainRepository + "/" + cluster + "/" + dataSource
+ + "/" + workingDay + "/[0-9]*/*.evt";
+ // temp dir
+ mergeArgs[1] = tempDir + "/" + cluster + "/" + dataSource + "/"
+ + workingDay + "_" + System.currentTimeMillis();
+ // final output dir
+ mergeArgs[2] = chukwaMainRepository + "/" + cluster + "/" + dataSource
+ + "/" + workingDay;
+ // final output fileName
+ mergeArgs[3] = dataSource + "_DailyDone_" + workingDay;
+ // delete rolling directory
+ mergeArgs[4] = rollingFolder + "/daily/" + workingDay + "/" + cluster
+ + "/" + dataSource;
+
+ log.info("DailyChukwaRecordRolling 0: " + mergeArgs[0]);
+ log.info("DailyChukwaRecordRolling 1: " + mergeArgs[1]);
+ log.info("DailyChukwaRecordRolling 2: " + mergeArgs[2]);
+ log.info("DailyChukwaRecordRolling 3: " + mergeArgs[3]);
+ log.info("DailyChukwaRecordRolling 4: " + mergeArgs[4]);
+
+ RecordMerger merge = new RecordMerger(conf, fs,
+ new DailyChukwaRecordRolling(), mergeArgs, deleteRawdata);
+ List<RecordMerger> allMerge = new ArrayList<RecordMerger>();
+ if (rollInSequence) {
+ merge.run();
+ } else {
+ allMerge.add(merge);
+ merge.start();
+ }
- // join all Threads
- if (!rollInSequence) {
- while (allMerge.size() > 0) {
- RecordMerger m = allMerge.remove(0);
- try {
- m.join();
- } catch (InterruptedException e) {
+ // join all Threads
+ if (!rollInSequence) {
+ while (allMerge.size() > 0) {
+ RecordMerger m = allMerge.remove(0);
+ try {
+ m.join();
+ } catch (InterruptedException e) {
+ }
}
- }
- } // End if (!rollInSequence)
+ } // End if (!rollInSequence)
- // Delete the processed dataSourceFS
+ // Delete the processed dataSourceFS
FileUtil.fullyDelete(fs, dataSourceFS.getPath());
- } // End for(FileStatus dataSourceFS : dataSourcesFS)
+ } // End for(FileStatus dataSourceFS : dataSourcesFS)
- // Delete the processed clusterFs
- if (alldone == true) {
- FileUtil.fullyDelete(fs, clusterFs.getPath());
- }
-
+ // Delete the processed clusterFs
+ if (alldone == true) {
+ FileUtil.fullyDelete(fs, clusterFs.getPath());
+ }
- } // End for(FileStatus clusterFs : clustersFS)
+ } // End for(FileStatus clusterFs : clustersFS)
+ }
// Delete the processed dayPath
if (alldone == true) {
FileUtil.fullyDelete(fs, dayPath);
Modified: incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/HourlyChukwaRecordRolling.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/HourlyChukwaRecordRolling.java?rev=1367843&r1=1367842&r2=1367843&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/HourlyChukwaRecordRolling.java (original)
+++ incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/HourlyChukwaRecordRolling.java Wed Aug 1 01:53:21 2012
@@ -28,6 +28,7 @@ import java.util.List;
import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.chukwa.util.HierarchyDataType;
import org.apache.hadoop.chukwa.util.DaemonWatcher;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
@@ -80,59 +81,64 @@ public class HourlyChukwaRecordRolling e
FileStatus[] dataSourcesFS = fs.listStatus(dataSourceClusterHourPaths);
for (FileStatus dataSourceFS : dataSourcesFS) {
- String dataSource = dataSourceFS.getPath().getName();
- // Repo path = reposRootDirectory/<cluster>/<datasource>/<day>/<hour>/*/*.evt
-
- // put the rotate flag
- fs.mkdirs(new Path(chukwaMainRepository + "/" + cluster + "/"
- + dataSource + "/" + workingDay + "/" + workingHour
- + "/rotateDone"));
-
- // rotate
- // Merge
- String[] mergeArgs = new String[5];
- // input
- mergeArgs[0] = chukwaMainRepository + "/" + cluster + "/" + dataSource
- + "/" + workingDay + "/" + workingHour + "/[0-5]*/*.evt";
- // temp dir
- mergeArgs[1] = tempDir + "/" + cluster + "/" + dataSource + "/"
- + workingDay + "/" + workingHour + "_" + System.currentTimeMillis();
- // final output dir
- mergeArgs[2] = chukwaMainRepository + "/" + cluster + "/" + dataSource
- + "/" + workingDay + "/" + workingHour;
- // final output fileName
- mergeArgs[3] = dataSource + "_HourlyDone_" + workingDay + "_" + workingHour;
- // delete rolling directory
- mergeArgs[4] = rollingFolder + "/hourly/" + workingDay + "/"
- + workingHour + "/" + cluster + "/" + dataSource;
-
- log.info("HourlyChukwaRecordRolling 0: " + mergeArgs[0]);
- log.info("HourlyChukwaRecordRolling 1: " + mergeArgs[1]);
- log.info("HourlyChukwaRecordRolling 2: " + mergeArgs[2]);
- log.info("HourlyChukwaRecordRolling 3: " + mergeArgs[3]);
- log.info("HourlyChukwaRecordRolling 4: " + mergeArgs[4]);
-
- RecordMerger merge = new RecordMerger(conf, fs,
- new HourlyChukwaRecordRolling(), mergeArgs, deleteRawdata);
- List<RecordMerger> allMerge = new ArrayList<RecordMerger>();
- if (rollInSequence) {
- merge.run();
- } else {
- allMerge.add(merge);
- merge.start();
- }
-
- // join all Threads
- if (!rollInSequence) {
- while (allMerge.size() > 0) {
- RecordMerger m = allMerge.remove(0);
- try {
- m.join();
- } catch (InterruptedException e) {
- }
+ //CHUKWA-648: Make Chukwa Reduce Type to support hierarchy format
+ for (FileStatus dataSourcePath : HierarchyDataType.globStatus(fs,
+ dataSourceFS.getPath(), true)) {
+ String dataSource = HierarchyDataType.getDataType(
+ dataSourcePath.getPath(),
+ fs.getFileStatus(dataSourceClusterHourPaths).getPath());
+ // Repo path = reposRootDirectory/<cluster>/<datasource>/<day>/<hour>/*/*.evt
+
+ // put the rotate flag
+ fs.mkdirs(new Path(chukwaMainRepository + "/" + cluster + "/"
+ + dataSource + "/" + workingDay + "/" + workingHour
+ + "/rotateDone"));
+
+ // rotate
+ // Merge
+ String[] mergeArgs = new String[5];
+ // input
+ mergeArgs[0] = chukwaMainRepository + "/" + cluster + "/" + dataSource
+ + "/" + workingDay + "/" + workingHour + "/[0-5]*/*.evt";
+ // temp dir
+ mergeArgs[1] = tempDir + "/" + cluster + "/" + dataSource + "/"
+ + workingDay + "/" + workingHour + "_" + System.currentTimeMillis();
+ // final output dir
+ mergeArgs[2] = chukwaMainRepository + "/" + cluster + "/" + dataSource
+ + "/" + workingDay + "/" + workingHour;
+ // final output fileName
+ mergeArgs[3] = dataSource + "_HourlyDone_" + workingDay + "_" + workingHour;
+ // delete rolling directory
+ mergeArgs[4] = rollingFolder + "/hourly/" + workingDay + "/"
+ + workingHour + "/" + cluster + "/" + dataSource;
+
+ log.info("HourlyChukwaRecordRolling 0: " + mergeArgs[0]);
+ log.info("HourlyChukwaRecordRolling 1: " + mergeArgs[1]);
+ log.info("HourlyChukwaRecordRolling 2: " + mergeArgs[2]);
+ log.info("HourlyChukwaRecordRolling 3: " + mergeArgs[3]);
+ log.info("HourlyChukwaRecordRolling 4: " + mergeArgs[4]);
+
+ RecordMerger merge = new RecordMerger(conf, fs,
+ new HourlyChukwaRecordRolling(), mergeArgs, deleteRawdata);
+ List<RecordMerger> allMerge = new ArrayList<RecordMerger>();
+ if (rollInSequence) {
+ merge.run();
+ } else {
+ allMerge.add(merge);
+ merge.start();
}
- } // End if (!rollInSequence)
+ // join all Threads
+ if (!rollInSequence) {
+ while (allMerge.size() > 0) {
+ RecordMerger m = allMerge.remove(0);
+ try {
+ m.join();
+ } catch (InterruptedException e) {
+ }
+ }
+ } // End if (!rollInSequence)
+ }
// Delete the processed dataSourceFS
FileUtil.fullyDelete(fs, dataSourceFS.getPath());
Modified: incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/MoveToRepository.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/MoveToRepository.java?rev=1367843&r1=1367842&r2=1367843&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/MoveToRepository.java (original)
+++ incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/MoveToRepository.java Wed Aug 1 01:53:21 2012
@@ -25,8 +25,11 @@ import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Collection;
import java.util.HashSet;
+import java.util.List;
import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
+import org.apache.hadoop.chukwa.util.HierarchyDataType;
+import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
@@ -66,20 +69,35 @@ public class MoveToRepository {
+ datasourceDirectory.getPath());
}
- String dirName = datasourceDirectory.getPath().getName();
- Path destPath = new Path(destDir + "/" + dirName);
- log.info("dest directory path: " + destPath);
- log.info("processClusterDirectory processing Datasource: (" + dirName
- + ")");
- destFiles.addAll(processDatasourceDirectory(srcDir.getName(),
- datasourceDirectory.getPath(), destDir + "/" + dirName));
+ PathFilter filter = new PathFilter()
+ {public boolean accept(Path file) {
+ return file.getName().endsWith(".evt");
+ } };
+ //CHUKWA-648: Make Chukwa Reduce Type to support hierarchy format
+ // to processDataSourceDirectory according to hierarchy data type format
+ List<FileStatus> eventfiles = HierarchyDataType.globStatus(fs, datasourceDirectory.getPath(),filter,true);
+ for (FileStatus eventfile : eventfiles){
+ Path datatypeDir = eventfile.getPath().getParent();
+ String dirName = HierarchyDataType.getDataType(datatypeDir, srcDir);
+
+ Path destPath = new Path(destDir + "/" + dirName);
+ log.info("dest directory path: " + destPath);
+ log.info("processClusterDirectory processing Datasource: (" + dirName
+ + ")");
+ StringBuilder dtDir = new StringBuilder(srcDir.toString()).append("/").append(dirName);
+ log.debug("srcDir: " + dtDir.toString());
+ processDatasourceDirectory(srcDir.toString(), new Path(dtDir.toString()), destDir + "/" + dirName);
+ }
}
}
return destFiles;
}
- static Collection<Path> processDatasourceDirectory(String cluster, Path srcDir,
+ static Collection<Path> processDatasourceDirectory(String clusterpath, Path srcDir,
String destDir) throws Exception {
+ Path cPath = new Path(clusterpath);
+ String cluster = cPath.getName();
+
Collection<Path> destFiles = new HashSet<Path>();
String fileName = null;
int fileDay = 0;
@@ -97,7 +115,7 @@ public class MoveToRepository {
log.info("fileName: " + fileName);
int l = fileName.length();
- String dataSource = srcDir.getName();
+ String dataSource = HierarchyDataType.getDataType(srcDir, cPath);
log.info("Datasource: " + dataSource);
if (fileName.endsWith(".D.evt")) {
@@ -105,7 +123,9 @@ public class MoveToRepository {
fileDay = Integer.parseInt(fileName.substring(l - 14, l - 6));
Path destFile = writeRecordFile(destDir + "/" + fileDay + "/",
- recordFile.getPath(), dataSource + "_" + fileDay);
+ recordFile.getPath(),
+ HierarchyDataType.getHierarchyDataTypeFileName(dataSource) + "_"
+ + fileDay);
if (destFile != null) {
destFiles.add(destFile);
}
@@ -129,8 +149,10 @@ public class MoveToRepository {
fileDay = Integer.parseInt(day);
fileHour = Integer.parseInt(hour);
// rotate there so spill
- Path destFile = writeRecordFile(destDir + "/" + fileDay + "/" + fileHour + "/",
- recordFile.getPath(), dataSource + "_" + fileDay + "_" + fileHour);
+ Path destFile = writeRecordFile(destDir + "/" + fileDay + "/"
+ + fileHour + "/", recordFile.getPath(),
+ HierarchyDataType.getHierarchyDataTypeFileName(dataSource) + "_"
+ + fileDay + "_" + fileHour);
if (destFile != null) {
destFiles.add(destFile);
}
@@ -150,9 +172,11 @@ public class MoveToRepository {
log.info("fileDay: " + fileDay);
log.info("fileHour: " + fileHour);
log.info("fileMin: " + fileMin);
- Path destFile = writeRecordFile(destDir + "/" + fileDay + "/" + fileHour + "/"
- + fileMin, recordFile.getPath(), dataSource + "_" + fileDay + "_"
- + fileHour + "_" + fileMin);
+ Path destFile = writeRecordFile(
+ destDir + "/" + fileDay + "/" + fileHour + "/" + fileMin,
+ recordFile.getPath(),
+ HierarchyDataType.getHierarchyDataTypeFileName(HierarchyDataType.trimSlash(dataSource))
+ + "_" + fileDay + "_" + fileHour + "_" + fileMin);
if (destFile != null) {
destFiles.add(destFile);
}
Modified: incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java?rev=1367843&r1=1367842&r2=1367843&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java (original)
+++ incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java Wed Aug 1 01:53:21 2012
@@ -31,6 +31,7 @@ import org.apache.hadoop.chukwa.dataload
import org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT;
import org.apache.hadoop.chukwa.util.DaemonWatcher;
import org.apache.hadoop.chukwa.util.ExceptionUtil;
+import org.apache.hadoop.chukwa.util.HierarchyDataType;
import org.apache.hadoop.chukwa.datatrigger.TriggerAction;
import org.apache.hadoop.chukwa.datatrigger.TriggerEvent;
import org.apache.hadoop.fs.FileStatus;
@@ -201,9 +202,17 @@ public class PostProcessorManager implem
log.info(dataLoaderName+" processing: "+directory);
StringBuilder dirSearch = new StringBuilder();
dirSearch.append(directory);
- dirSearch.append("/*/*/*.evt");
+ dirSearch.append("/*/*");
+ log.debug("dirSearch: " + dirSearch);
Path demuxDir = new Path(dirSearch.toString());
- FileStatus[] events = fs.globStatus(demuxDir);
+ // CHUKWA-648: Make Chukwa Reduce Type to support hierarchy format
+ // List all event files under the hierarchy data-type directory
+ PathFilter filter = new PathFilter()
+ {public boolean accept(Path file) {
+ return file.getName().endsWith(".evt");
+ } };
+ List<FileStatus> eventfiles = HierarchyDataType.globStatus(fs, demuxDir,filter,true);
+ FileStatus[] events = eventfiles.toArray(new FileStatus[eventfiles.size()]);
dataloader.load(conf, fs, events);
}
} catch(Exception e) {