You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by ey...@apache.org on 2009/12/16 03:50:10 UTC

svn commit: r891104 - in /hadoop/chukwa/trunk: CHANGES.txt src/java/org/apache/hadoop/chukwa/dataloader/FSMDataLoader.java

Author: eyang
Date: Wed Dec 16 02:50:09 2009
New Revision: 891104

URL: http://svn.apache.org/viewvc?rev=891104&view=rev
Log:
CHUKWA-430. Narrow down the list of demux output for FSM to improve processing time. (Eric Yang)

Modified:
    hadoop/chukwa/trunk/CHANGES.txt
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/dataloader/FSMDataLoader.java

Modified: hadoop/chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/CHANGES.txt?rev=891104&r1=891103&r2=891104&view=diff
==============================================================================
--- hadoop/chukwa/trunk/CHANGES.txt (original)
+++ hadoop/chukwa/trunk/CHANGES.txt Wed Dec 16 02:50:09 2009
@@ -34,6 +34,8 @@
 
   BUG FIXES
 
+    CHUKWA-430. Narrow down the list of demux output for FSM to improve processing time. (Eric Yang)
+
     CHUKWA-428. Revised location for Chukwa HDFS repository. (Eric Yang)
 
     CHUKWA-241. Revise chukwa-config.sh so that chukwa runs after building from source. (Eric Yang via asrabkin)

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/dataloader/FSMDataLoader.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/dataloader/FSMDataLoader.java?rev=891104&r1=891103&r2=891104&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/dataloader/FSMDataLoader.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/dataloader/FSMDataLoader.java Wed Dec 16 02:50:09 2009
@@ -81,24 +81,34 @@
           inputPaths.add(temp);
         }
       }
-      String[] args = new String[inputPaths.size()+3];
       String outputDir= conf.get("chukwa.tmp.data.dir")+File.separator+"fsm_"+System.currentTimeMillis()+"_";
       if(inputPaths.size()>0) {
         Configuration fsmConf = new Configuration();
-        args[0]="-in";
-        args[1]=inputPaths.size()+"";
-        int k=2;
-        for(Path temp : inputPaths) {
-          args[k]=temp.toUri().toString();
-          k++;
-        }
         // Run fsm map reduce job for dn, tt, and jobhist.
         for(String mapper : mappers) {
+          String[] args = new String[inputPaths.size()+3];
+          args[0]="-in";
+          int k=2;
+          boolean hasData=false;
+          for(Path temp : inputPaths) {
+            String tempPath = temp.toUri().toString();
+            if((mapper.intern()==mappers[0].intern() && tempPath.indexOf("ClientTraceDetailed")>0) ||
+                (mapper.intern()==mappers[1].intern() && tempPath.indexOf("ClientTraceDetailed")>0) ||
+                (mapper.intern()==mappers[2].intern() && tempPath.indexOf("TaskData")>0) ||
+                (mapper.intern()==mappers[2].intern() && tempPath.indexOf("JobData")>0)) {
+              args[k]=tempPath;
+              k++;
+              hasData=true;
+            }
+          }
+          args[1]=k-2+"";
           fsmConf.set("chukwa.salsa.fsm.mapclass", mapper);
           args[k]=outputDir+mapper;
           Path outputPath = new Path(args[k]);
           outputPaths.add(outputPath);
-          int res = ToolRunner.run(fsmConf, new FSMBuilder(), args);
+          if(hasData) {
+            int res = ToolRunner.run(fsmConf, new FSMBuilder(), args);
+          }
         }
       }
       // Find the mapreduce output and load to MDL.