You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by as...@apache.org on 2012/08/01 03:53:22 UTC

svn commit: r1367843 - in /incubator/chukwa/trunk: ./ src/main/java/org/apache/hadoop/chukwa/extraction/ src/main/java/org/apache/hadoop/chukwa/extraction/demux/

Author: asrabkin
Date: Wed Aug  1 01:53:21 2012
New Revision: 1367843

URL: http://svn.apache.org/viewvc?rev=1367843&view=rev
Log:
CHUKWA-648. Make Chukwa Reduce Type to support hierarchy format. Contributed by Jie Huang.

Modified:
    incubator/chukwa/trunk/CHANGES.txt
    incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/CHUKWA_CONSTANT.java
    incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/ChukwaRecordOutputFormat.java
    incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/DailyChukwaRecordRolling.java
    incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/HourlyChukwaRecordRolling.java
    incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/MoveToRepository.java
    incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java

Modified: incubator/chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/CHANGES.txt?rev=1367843&r1=1367842&r2=1367843&view=diff
==============================================================================
--- incubator/chukwa/trunk/CHANGES.txt (original)
+++ incubator/chukwa/trunk/CHANGES.txt Wed Aug  1 01:53:21 2012
@@ -6,6 +6,8 @@ Trunk (unreleased changes)
 
   IMPROVEMENTS
 
+    CHUKWA-648. Make Chukwa Reduce Type to support hierarchy format. (Jie Huang via asrabkin)
+
     CHUKWA-650. Re-configure Demux ReduceNumber without re-starting the DemuxManager service. (Jie Huang via asrabkin)
     
     CHUKWA-649. Handle pid-file missing case while stoping chukwa processes. (Jie Huang via asrabkin)

Modified: incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/CHUKWA_CONSTANT.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/CHUKWA_CONSTANT.java?rev=1367843&r1=1367842&r2=1367843&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/CHUKWA_CONSTANT.java (original)
+++ incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/CHUKWA_CONSTANT.java Wed Aug  1 01:53:21 2012
@@ -65,5 +65,9 @@ public interface CHUKWA_CONSTANT {
 
   public static final String POST_DEMUX_DATA_LOADER = "chukwa.post.demux.data.loader";  
   public static final String INCLUDE_KEY_IN_PARTITIONER = "_";
+  
+  //CHUKWA-648:  Make Chukwa Reduce Type to support hierarchy format
+  //To support Hierarchy datatype
+  public static final String HIERARCHY_CONNECTOR = "-";
   public static final String POST_DEMUX_SUCCESS_ACTION = "chukwa.post.demux.success.action";  
 }

Modified: incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/ChukwaRecordOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/ChukwaRecordOutputFormat.java?rev=1367843&r1=1367842&r2=1367843&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/ChukwaRecordOutputFormat.java (original)
+++ incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/ChukwaRecordOutputFormat.java Wed Aug  1 01:53:21 2012
@@ -23,6 +23,7 @@ import org.apache.hadoop.chukwa.extracti
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
 import org.apache.hadoop.chukwa.extraction.engine.RecordUtil;
+import org.apache.hadoop.chukwa.util.HierarchyDataType;
 import org.apache.hadoop.mapred.lib.MultipleSequenceFileOutputFormat;
 import org.apache.log4j.Logger;
 
@@ -33,8 +34,14 @@ public class ChukwaRecordOutputFormat ex
   @Override
   protected String generateFileNameForKeyValue(ChukwaRecordKey key,
       ChukwaRecord record, String name) {
+    //CHUKWA-648:  Make Chukwa Reduce Type to support hierarchy format    
+    //Allow the user to define hierarchy data-type separated by slash mark
+    //Transform the reduceType from
+    // "datatypeLevel1-datatypeLevel2-datatypeLevel3" to
+    // "datatypeLevel1/datatypeLevel2/datatypeLevel3"
     String output = RecordUtil.getClusterName(record) + "/"
-        + key.getReduceType() + "/" + key.getReduceType()
+        + key.getReduceType() + "/"
+        + HierarchyDataType.getHierarchyDataTypeDirectory(key.getReduceType())
         + Util.generateTimeOutput(record.getTime());
 
     // {log.info("ChukwaOutputFormat.fileName: [" + output +"]");}

Modified: incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/DailyChukwaRecordRolling.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/DailyChukwaRecordRolling.java?rev=1367843&r1=1367842&r2=1367843&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/DailyChukwaRecordRolling.java (original)
+++ incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/DailyChukwaRecordRolling.java Wed Aug  1 01:53:21 2012
@@ -30,6 +30,7 @@ import org.apache.hadoop.chukwa.extracti
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
 import org.apache.hadoop.chukwa.util.DaemonWatcher;
 import org.apache.hadoop.chukwa.util.ExceptionUtil;
+import org.apache.hadoop.chukwa.util.HierarchyDataType;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -114,82 +115,87 @@ public class DailyChukwaRecordRolling ex
           + workingDay + "/" + cluster);
       FileStatus[] dataSourcesFS = fs.listStatus(dataSourceClusterHourPaths);
       for (FileStatus dataSourceFS : dataSourcesFS) {
-        String dataSource = dataSourceFS.getPath().getName();
-        // Repo path = reposRootDirectory/<cluster>/<day>/*/*.evt
-
-
-        // put the rotate flag
-        fs.mkdirs(new Path(chukwaMainRepository + "/" + cluster + "/"
-            + dataSource + "/" + workingDay + "/rotateDone"));
-        
-        if (hourlyRolling(chukwaMainRepository + "/" + cluster + "/" + dataSource + "/" + workingDay) == false) {
-          log.warn("Skipping this directory, hourly not done. " + chukwaMainRepository + "/" + cluster + "/"
-            + dataSource + "/" + workingDay );
-          alldone = false;
-          continue;
-        } 
-        
-        log.info("Running Daily rolling for " + chukwaMainRepository + "/" + cluster + "/"
-            + dataSource + "/" + workingDay + "/rotateDone");
-        
-        // rotate
-        // Merge
-        String[] mergeArgs = new String[5];
-        // input
-        mergeArgs[0] = chukwaMainRepository + "/" + cluster + "/" + dataSource
-            + "/" + workingDay + "/[0-9]*/*.evt";
-        // temp dir
-        mergeArgs[1] = tempDir + "/" + cluster + "/" + dataSource + "/"
-            + workingDay + "_" + System.currentTimeMillis();
-        // final output dir
-        mergeArgs[2] = chukwaMainRepository + "/" + cluster + "/" + dataSource
-            + "/" + workingDay;
-        // final output fileName
-        mergeArgs[3] = dataSource + "_DailyDone_"  + workingDay;
-        // delete rolling directory
-        mergeArgs[4] = rollingFolder + "/daily/" + workingDay + "/" + cluster
-            + "/" + dataSource;
-
-        log.info("DailyChukwaRecordRolling 0: " + mergeArgs[0]);
-        log.info("DailyChukwaRecordRolling 1: " + mergeArgs[1]);
-        log.info("DailyChukwaRecordRolling 2: " + mergeArgs[2]);
-        log.info("DailyChukwaRecordRolling 3: " + mergeArgs[3]);
-        log.info("DailyChukwaRecordRolling 4: " + mergeArgs[4]);
-
-        RecordMerger merge = new RecordMerger(conf, fs,
-            new DailyChukwaRecordRolling(), mergeArgs, deleteRawdata);
-        List<RecordMerger> allMerge = new ArrayList<RecordMerger>();
-        if (rollInSequence) {
-          merge.run();
-        } else {
-          allMerge.add(merge);
-          merge.start();
-        }
+        //CHUKWA-648:  Make Chukwa Reduce Type to support hierarchy format  
+        for (FileStatus dataSourcePath : HierarchyDataType.globStatus(fs,
+            dataSourceFS.getPath(), true)) {
+          String dataSource = HierarchyDataType.getDataType(
+              dataSourcePath.getPath(),
+              fs.getFileStatus(dataSourceClusterHourPaths).getPath());
+          // Repo path = reposRootDirectory/<cluster>/<day>/*/*.evt
+
+
+          // put the rotate flag
+          fs.mkdirs(new Path(chukwaMainRepository + "/" + cluster + "/"
+              + dataSource + "/" + workingDay + "/rotateDone"));
+
+          if (hourlyRolling(chukwaMainRepository + "/" + cluster + "/" + dataSource + "/" + workingDay) == false) {
+            log.warn("Skipping this directory, hourly not done. " + chukwaMainRepository + "/" + cluster + "/"
+                + dataSource + "/" + workingDay );
+            alldone = false;
+            continue;
+          } 
+
+          log.info("Running Daily rolling for " + chukwaMainRepository + "/" + cluster + "/"
+              + dataSource + "/" + workingDay + "/rotateDone");
+
+          // rotate
+          // Merge
+          String[] mergeArgs = new String[5];
+          // input
+          mergeArgs[0] = chukwaMainRepository + "/" + cluster + "/" + dataSource
+              + "/" + workingDay + "/[0-9]*/*.evt";
+          // temp dir
+          mergeArgs[1] = tempDir + "/" + cluster + "/" + dataSource + "/"
+              + workingDay + "_" + System.currentTimeMillis();
+          // final output dir
+          mergeArgs[2] = chukwaMainRepository + "/" + cluster + "/" + dataSource
+              + "/" + workingDay;
+          // final output fileName
+          mergeArgs[3] = dataSource + "_DailyDone_"  + workingDay;
+          // delete rolling directory
+          mergeArgs[4] = rollingFolder + "/daily/" + workingDay + "/" + cluster
+              + "/" + dataSource;
+
+          log.info("DailyChukwaRecordRolling 0: " + mergeArgs[0]);
+          log.info("DailyChukwaRecordRolling 1: " + mergeArgs[1]);
+          log.info("DailyChukwaRecordRolling 2: " + mergeArgs[2]);
+          log.info("DailyChukwaRecordRolling 3: " + mergeArgs[3]);
+          log.info("DailyChukwaRecordRolling 4: " + mergeArgs[4]);
+
+          RecordMerger merge = new RecordMerger(conf, fs,
+              new DailyChukwaRecordRolling(), mergeArgs, deleteRawdata);
+          List<RecordMerger> allMerge = new ArrayList<RecordMerger>();
+          if (rollInSequence) {
+            merge.run();
+          } else {
+            allMerge.add(merge);
+            merge.start();
+          }
 
-        // join all Threads
-        if (!rollInSequence) {
-          while (allMerge.size() > 0) {
-            RecordMerger m = allMerge.remove(0);
-            try {
-              m.join();
-            } catch (InterruptedException e) {
+          // join all Threads
+          if (!rollInSequence) {
+            while (allMerge.size() > 0) {
+              RecordMerger m = allMerge.remove(0);
+              try {
+                m.join();
+              } catch (InterruptedException e) {
+              }
             }
-          }
-        } // End if (!rollInSequence)
+          } // End if (!rollInSequence)
 
-        // Delete the processed dataSourceFS
+          // Delete the processed dataSourceFS
           FileUtil.fullyDelete(fs, dataSourceFS.getPath());
 
-      } // End for(FileStatus dataSourceFS : dataSourcesFS)
+        } // End for(FileStatus dataSourceFS : dataSourcesFS)
 
-      // Delete the processed clusterFs
-      if (alldone == true) {
-        FileUtil.fullyDelete(fs, clusterFs.getPath());
-      }
-      
+        // Delete the processed clusterFs
+        if (alldone == true) {
+          FileUtil.fullyDelete(fs, clusterFs.getPath());
+        }
 
-    } // End for(FileStatus clusterFs : clustersFS)
 
+      } // End for(FileStatus clusterFs : clustersFS)
+    }
     // Delete the processed dayPath
     if (alldone == true) {
       FileUtil.fullyDelete(fs, dayPath);

Modified: incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/HourlyChukwaRecordRolling.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/HourlyChukwaRecordRolling.java?rev=1367843&r1=1367842&r2=1367843&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/HourlyChukwaRecordRolling.java (original)
+++ incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/HourlyChukwaRecordRolling.java Wed Aug  1 01:53:21 2012
@@ -28,6 +28,7 @@ import java.util.List;
 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.chukwa.util.HierarchyDataType;
 import org.apache.hadoop.chukwa.util.DaemonWatcher;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileStatus;
@@ -80,59 +81,64 @@ public class HourlyChukwaRecordRolling e
       FileStatus[] dataSourcesFS = fs.listStatus(dataSourceClusterHourPaths);
       
       for (FileStatus dataSourceFS : dataSourcesFS) {
-        String dataSource = dataSourceFS.getPath().getName();
-        // Repo path = reposRootDirectory/<cluster>/<datasource>/<day>/<hour>/*/*.evt
-
-        // put the rotate flag
-        fs.mkdirs(new Path(chukwaMainRepository + "/" + cluster + "/"
-                + dataSource + "/" + workingDay + "/" + workingHour
-                + "/rotateDone"));
-
-        // rotate
-        // Merge
-        String[] mergeArgs = new String[5];
-        // input
-        mergeArgs[0] = chukwaMainRepository + "/" + cluster + "/" + dataSource
-            + "/" + workingDay + "/" + workingHour + "/[0-5]*/*.evt";
-        // temp dir
-        mergeArgs[1] = tempDir + "/" + cluster + "/" + dataSource + "/"
-            + workingDay + "/" + workingHour + "_" + System.currentTimeMillis();
-        // final output dir
-        mergeArgs[2] = chukwaMainRepository + "/" + cluster + "/" + dataSource
-            + "/" + workingDay + "/" + workingHour;
-        // final output fileName
-        mergeArgs[3] = dataSource + "_HourlyDone_" + workingDay + "_" + workingHour;
-        // delete rolling directory
-        mergeArgs[4] = rollingFolder + "/hourly/" + workingDay + "/"
-            + workingHour + "/" + cluster + "/" + dataSource;
-
-        log.info("HourlyChukwaRecordRolling 0: " + mergeArgs[0]);
-        log.info("HourlyChukwaRecordRolling 1: " + mergeArgs[1]);
-        log.info("HourlyChukwaRecordRolling 2: " + mergeArgs[2]);
-        log.info("HourlyChukwaRecordRolling 3: " + mergeArgs[3]);
-        log.info("HourlyChukwaRecordRolling 4: " + mergeArgs[4]);
-
-        RecordMerger merge = new RecordMerger(conf, fs,
-            new HourlyChukwaRecordRolling(), mergeArgs, deleteRawdata);
-        List<RecordMerger> allMerge = new ArrayList<RecordMerger>();
-        if (rollInSequence) {
-          merge.run();
-        } else {
-          allMerge.add(merge);
-          merge.start();
-        }
-
-        // join all Threads
-        if (!rollInSequence) {
-          while (allMerge.size() > 0) {
-            RecordMerger m = allMerge.remove(0);
-            try {
-              m.join();
-            } catch (InterruptedException e) {
-            }
+        //CHUKWA-648:  Make Chukwa Reduce Type to support hierarchy format  
+        for (FileStatus dataSourcePath : HierarchyDataType.globStatus(fs,
+            dataSourceFS.getPath(), true)) {
+          String dataSource = HierarchyDataType.getDataType(
+              dataSourcePath.getPath(),
+              fs.getFileStatus(dataSourceClusterHourPaths).getPath());
+          // Repo path = reposRootDirectory/<cluster>/<datasource>/<day>/<hour>/*/*.evt
+
+          // put the rotate flag
+          fs.mkdirs(new Path(chukwaMainRepository + "/" + cluster + "/"
+              + dataSource + "/" + workingDay + "/" + workingHour
+              + "/rotateDone"));
+
+          // rotate
+          // Merge
+          String[] mergeArgs = new String[5];
+          // input
+          mergeArgs[0] = chukwaMainRepository + "/" + cluster + "/" + dataSource
+              + "/" + workingDay + "/" + workingHour + "/[0-5]*/*.evt";
+          // temp dir
+          mergeArgs[1] = tempDir + "/" + cluster + "/" + dataSource + "/"
+              + workingDay + "/" + workingHour + "_" + System.currentTimeMillis();
+          // final output dir
+          mergeArgs[2] = chukwaMainRepository + "/" + cluster + "/" + dataSource
+              + "/" + workingDay + "/" + workingHour;
+          // final output fileName
+          mergeArgs[3] = dataSource + "_HourlyDone_" + workingDay + "_" + workingHour;
+          // delete rolling directory
+          mergeArgs[4] = rollingFolder + "/hourly/" + workingDay + "/"
+              + workingHour + "/" + cluster + "/" + dataSource;
+
+          log.info("HourlyChukwaRecordRolling 0: " + mergeArgs[0]);
+          log.info("HourlyChukwaRecordRolling 1: " + mergeArgs[1]);
+          log.info("HourlyChukwaRecordRolling 2: " + mergeArgs[2]);
+          log.info("HourlyChukwaRecordRolling 3: " + mergeArgs[3]);
+          log.info("HourlyChukwaRecordRolling 4: " + mergeArgs[4]);
+
+          RecordMerger merge = new RecordMerger(conf, fs,
+              new HourlyChukwaRecordRolling(), mergeArgs, deleteRawdata);
+          List<RecordMerger> allMerge = new ArrayList<RecordMerger>();
+          if (rollInSequence) {
+            merge.run();
+          } else {
+            allMerge.add(merge);
+            merge.start();
           }
-        } // End if (!rollInSequence)
 
+          // join all Threads
+          if (!rollInSequence) {
+            while (allMerge.size() > 0) {
+              RecordMerger m = allMerge.remove(0);
+              try {
+                m.join();
+              } catch (InterruptedException e) {
+              }
+            }
+          } // End if (!rollInSequence)
+        }
         // Delete the processed dataSourceFS
         FileUtil.fullyDelete(fs, dataSourceFS.getPath());
 

Modified: incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/MoveToRepository.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/MoveToRepository.java?rev=1367843&r1=1367842&r2=1367843&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/MoveToRepository.java (original)
+++ incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/MoveToRepository.java Wed Aug  1 01:53:21 2012
@@ -25,8 +25,11 @@ import java.text.SimpleDateFormat;
 import java.util.Calendar;
 import java.util.Collection;
 import java.util.HashSet;
+import java.util.List;
 
 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
+import org.apache.hadoop.chukwa.util.HierarchyDataType;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
@@ -66,20 +69,35 @@ public class MoveToRepository {
                   + datasourceDirectory.getPath());
         }
 
-        String dirName = datasourceDirectory.getPath().getName();
-        Path destPath = new Path(destDir + "/" + dirName);
-        log.info("dest directory path: " + destPath);
-        log.info("processClusterDirectory processing Datasource: (" + dirName
-            + ")");
-        destFiles.addAll(processDatasourceDirectory(srcDir.getName(),
-            datasourceDirectory.getPath(), destDir + "/" + dirName));
+        PathFilter filter = new PathFilter()
+        {public boolean accept(Path file) {
+          return file.getName().endsWith(".evt");
+        }  };
+        //CHUKWA-648:  Make Chukwa Reduce Type to support hierarchy format
+        // to processDataSourceDirectory according to hierarchy data type format 
+        List<FileStatus> eventfiles = HierarchyDataType.globStatus(fs, datasourceDirectory.getPath(),filter,true);
+        for (FileStatus eventfile : eventfiles){
+          Path datatypeDir = eventfile.getPath().getParent();
+          String dirName = HierarchyDataType.getDataType(datatypeDir, srcDir);
+        
+          Path destPath = new Path(destDir + "/" + dirName);
+          log.info("dest directory path: " + destPath);
+          log.info("processClusterDirectory processing Datasource: (" + dirName
+              + ")");
+          StringBuilder dtDir = new StringBuilder(srcDir.toString()).append("/").append(dirName);
+          log.debug("srcDir: " + dtDir.toString());
+          processDatasourceDirectory(srcDir.toString(), new Path(dtDir.toString()), destDir + "/" + dirName);
+        }
       }
     }
     return destFiles;
   }
 
-  static Collection<Path> processDatasourceDirectory(String cluster, Path srcDir,
+  static Collection<Path> processDatasourceDirectory(String clusterpath, Path srcDir,
       String destDir) throws Exception {
+    Path cPath = new Path(clusterpath);
+    String cluster = cPath.getName();
+    
     Collection<Path> destFiles = new HashSet<Path>();
     String fileName = null;
     int fileDay = 0;
@@ -97,7 +115,7 @@ public class MoveToRepository {
       log.info("fileName: " + fileName);
 
       int l = fileName.length();
-      String dataSource = srcDir.getName();
+      String dataSource = HierarchyDataType.getDataType(srcDir, cPath);
       log.info("Datasource: " + dataSource);
 
       if (fileName.endsWith(".D.evt")) {
@@ -105,7 +123,9 @@ public class MoveToRepository {
 
         fileDay = Integer.parseInt(fileName.substring(l - 14, l - 6));
         Path destFile = writeRecordFile(destDir + "/" + fileDay + "/",
-            recordFile.getPath(), dataSource + "_" + fileDay);
+            recordFile.getPath(),
+            HierarchyDataType.getHierarchyDataTypeFileName(dataSource) + "_"
+                + fileDay);
         if (destFile != null) {
           destFiles.add(destFile);
         }
@@ -129,8 +149,10 @@ public class MoveToRepository {
         fileDay = Integer.parseInt(day);
         fileHour = Integer.parseInt(hour);
         // rotate there so spill
-        Path destFile = writeRecordFile(destDir + "/" + fileDay + "/" + fileHour + "/",
-            recordFile.getPath(), dataSource + "_" + fileDay + "_" + fileHour);
+        Path destFile = writeRecordFile(destDir + "/" + fileDay + "/"
+            + fileHour + "/", recordFile.getPath(),
+            HierarchyDataType.getHierarchyDataTypeFileName(dataSource) + "_"
+                + fileDay + "_" + fileHour);
         if (destFile != null) {
           destFiles.add(destFile);
         }
@@ -150,9 +172,11 @@ public class MoveToRepository {
         log.info("fileDay: " + fileDay);
         log.info("fileHour: " + fileHour);
         log.info("fileMin: " + fileMin);
-        Path destFile = writeRecordFile(destDir + "/" + fileDay + "/" + fileHour + "/"
-            + fileMin, recordFile.getPath(), dataSource + "_" + fileDay + "_"
-            + fileHour + "_" + fileMin);
+        Path destFile = writeRecordFile(
+            destDir + "/" + fileDay + "/" + fileHour + "/" + fileMin,
+            recordFile.getPath(),
+            HierarchyDataType.getHierarchyDataTypeFileName(HierarchyDataType.trimSlash(dataSource))
+            + "_" + fileDay + "_" + fileHour + "_" + fileMin);
         if (destFile != null) {
           destFiles.add(destFile);
         }

Modified: incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java?rev=1367843&r1=1367842&r2=1367843&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java (original)
+++ incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java Wed Aug  1 01:53:21 2012
@@ -31,6 +31,7 @@ import org.apache.hadoop.chukwa.dataload
 import org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT;
 import org.apache.hadoop.chukwa.util.DaemonWatcher;
 import org.apache.hadoop.chukwa.util.ExceptionUtil;
+import org.apache.hadoop.chukwa.util.HierarchyDataType;
 import org.apache.hadoop.chukwa.datatrigger.TriggerAction;
 import org.apache.hadoop.chukwa.datatrigger.TriggerEvent;
 import org.apache.hadoop.fs.FileStatus;
@@ -201,9 +202,17 @@ public class PostProcessorManager implem
         log.info(dataLoaderName+" processing: "+directory);
         StringBuilder dirSearch = new StringBuilder();
         dirSearch.append(directory);
-        dirSearch.append("/*/*/*.evt");
+        dirSearch.append("/*/*");
+        log.debug("dirSearch: " + dirSearch);
         Path demuxDir = new Path(dirSearch.toString());
-        FileStatus[] events = fs.globStatus(demuxDir);
+        // CHUKWA-648:  Make Chukwa Reduce Type to support hierarchy format
+        // List all event files under the hierarchy data-type directory
+        PathFilter filter = new PathFilter()
+        {public boolean accept(Path file) {
+          return file.getName().endsWith(".evt");
+        }  };
+        List<FileStatus> eventfiles = HierarchyDataType.globStatus(fs, demuxDir,filter,true);
+        FileStatus[] events = eventfiles.toArray(new FileStatus[eventfiles.size()]);
         dataloader.load(conf, fs, events);
       }
     } catch(Exception e) {