You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ey...@apache.org on 2009/02/27 19:23:01 UTC

svn commit: r748628 - in /hadoop/core/trunk/src/contrib/chukwa: bin/mdl.sh src/java/org/apache/hadoop/chukwa/extraction/database/MetricDataLoader.java

Author: eyang
Date: Fri Feb 27 18:23:01 2009
New Revision: 748628

URL: http://svn.apache.org/viewvc?rev=748628&view=rev
Log:
HADOOP-5029.  Added mdl script to manually load chukwa sequence file to database.

Added:
    hadoop/core/trunk/src/contrib/chukwa/bin/mdl.sh
Modified:
    hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/database/MetricDataLoader.java

Added: hadoop/core/trunk/src/contrib/chukwa/bin/mdl.sh
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/bin/mdl.sh?rev=748628&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/bin/mdl.sh (added)
+++ hadoop/core/trunk/src/contrib/chukwa/bin/mdl.sh Fri Feb 27 18:23:01 2009
@@ -0,0 +1,28 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+. "$bin"/chukwa-config.sh
+
+if [ $# -lt 2 ]; then
+    echo "Usage: mdl.sh <cluster name> <chukwa sequence file>"
+    echo ""
+    exit 1
+fi
+
+${JAVA_HOME}/bin/java -Xms2048M -Xmx3096M -DDATACONFIG=${CHUKWA_CONF_DIR}/mdl.xml -Djava.library.path=${JAVA_LIBRARY_PATH} -DCHUKWA_HOME=${CHUKWA_HOME} -DCHUKWA_CONF_DIR=${CHUKWA_CONF_DIR} -DCHUKWA_LOG_DIR=${CHUKWA_LOG_DIR} -DAPP=MDL -Dlog4j.configuration=chukwa-log4j.properties -classpath ${CLASSPATH}:/homes/eyang/chukwa-core-0.1.1.jar:${HADOOP_JAR}:${COMMON}:${tools}:${CHUKWA_HOME}/conf org.apache.hadoop.chukwa.extraction.database.MetricDataLoader $@
+

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/database/MetricDataLoader.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/database/MetricDataLoader.java?rev=748628&r1=748627&r2=748628&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/database/MetricDataLoader.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/database/MetricDataLoader.java Fri Feb 27 18:23:01 2009
@@ -57,6 +57,7 @@
      private static HashMap<String, String> dbTables = null;
      private HashMap<String, HashMap<String,Integer>> dbSchema = null;
      private static String newSpace="-";
+     private static boolean batchMode = true;
 
      /** Creates a new instance of DBWriter */
     public MetricDataLoader() {        
@@ -146,31 +147,30 @@
     
     public void process(Path source)  throws IOException, URISyntaxException, SQLException {
 
-		System.out.println("Input file:" + source.getName());
+        System.out.println("Input file:" + source.getName());
 
-		ChukwaConfiguration conf = new ChukwaConfiguration();
-		String fsName = conf.get("writer.hdfs.filesystem");
-		FileSystem fs = FileSystem.get(new URI(fsName), conf);
+        ChukwaConfiguration conf = new ChukwaConfiguration();
+        String fsName = conf.get("writer.hdfs.filesystem");
+        FileSystem fs = FileSystem.get(new URI(fsName), conf);
 
-		SequenceFile.Reader r = 
+        SequenceFile.Reader r = 
 			new SequenceFile.Reader(fs,source, conf);
 
         stmt = conn.createStatement(); 
         conn.setAutoCommit(false);
         
-		ChukwaRecordKey key = new ChukwaRecordKey();
-		ChukwaRecord record = new ChukwaRecord();
-		try
-		{
-			while (r.next(key, record))
-			{
-				boolean isSuccessful=true;
-                String sqlTime = DatabaseWriter.formatTimeStamp(record.getTime());
-				log.debug("Timestamp: " + record.getTime());
-				log.debug("DataType: " + key.getReduceType());
-				log.debug("StreamName: " + source.getName());
+        ChukwaRecordKey key = new ChukwaRecordKey();
+        ChukwaRecord record = new ChukwaRecord();
+        try {
+            int batch=0;
+            while (r.next(key, record)) {
+                    boolean isSuccessful=true;
+                    String sqlTime = DatabaseWriter.formatTimeStamp(record.getTime());
+                    log.debug("Timestamp: " + record.getTime());
+                    log.debug("DataType: " + key.getReduceType());
+                    log.debug("StreamName: " + source.getName());
 		
-				String[] fields = record.getFields();
+                    String[] fields = record.getFields();
 	            String table = null;
 	            String[] priKeys = null;
 	            HashMap<String, HashMap<String, String>> hashReport = new HashMap<String ,HashMap<String, String>>();
@@ -232,6 +232,7 @@
 	            }
 	            Iterator<String> i = hashReport.keySet().iterator();
 	            while(i.hasNext()) {
+	                long currentTimeMillis = System.currentTimeMillis();
 	                Object iteratorNode = i.next();
 	                HashMap<String, String> recordSet = hashReport.get(iteratorNode);
 	                Iterator<String> fi = recordSet.keySet().iterator();
@@ -298,19 +299,28 @@
 	                          " ON DUPLICATE KEY UPDATE " + sqlValues + ";";
 	                }
 	                log.debug(sql);
-	                stmt.addBatch(sql);
+	                if(batchMode) {
+		                stmt.addBatch(sql);
+		                batch++;
+	                } else {
+	                	stmt.execute(sql);
+	                }
 	                String logMsg = (isSuccessful ? "Saved" : "Error occurred in saving");
-	                long currentTimeMillis = System.currentTimeMillis();
-	                long latencyMillis = currentTimeMillis - record.getTime();
+	                long latencyMillis = System.currentTimeMillis() - currentTimeMillis;
 	                int latencySeconds = ((int)(latencyMillis + 500)) / 1000;
+	    			if(batchMode && batch>20000) {
+	    			    int[] updateCounts = stmt.executeBatch();
+	    			    batch=0;
+	    			}
 	                log.debug(logMsg + " (" + recordType + "," + RecordUtil.getClusterName(record) +
 	                       "," + record.getTime() +
 	                       ") " + latencySeconds + " sec");	               
 	            }
 
 			}
-			@SuppressWarnings("unused")
-			int[] updateCounts = stmt.executeBatch();
+			if(batchMode) {
+			    int[] updateCounts = stmt.executeBatch();
+			}
 		} catch (SQLException ex) {
 			// handle any errors
 			log.error(ex, ex);
@@ -341,8 +351,8 @@
     
 	public static void main(String[] args) {
 		try {
-			MetricDataLoader mdl = new MetricDataLoader();
-			mdl.process(new Path(args[0]));
+			MetricDataLoader mdl = new MetricDataLoader(args[0]);
+			mdl.process(new Path(args[1]));
 		} catch(Exception e) {
 			e.printStackTrace();
 		}