You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ey...@apache.org on 2009/02/27 19:23:01 UTC
svn commit: r748628 - in /hadoop/core/trunk/src/contrib/chukwa: bin/mdl.sh
src/java/org/apache/hadoop/chukwa/extraction/database/MetricDataLoader.java
Author: eyang
Date: Fri Feb 27 18:23:01 2009
New Revision: 748628
URL: http://svn.apache.org/viewvc?rev=748628&view=rev
Log:
HADOOP-5029. Added mdl script to manually load chukwa sequence file to database.
Added:
hadoop/core/trunk/src/contrib/chukwa/bin/mdl.sh
Modified:
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/database/MetricDataLoader.java
Added: hadoop/core/trunk/src/contrib/chukwa/bin/mdl.sh
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/bin/mdl.sh?rev=748628&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/bin/mdl.sh (added)
+++ hadoop/core/trunk/src/contrib/chukwa/bin/mdl.sh Fri Feb 27 18:23:01 2009
@@ -0,0 +1,28 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+. "$bin"/chukwa-config.sh
+
+if [ $# -lt 2 ]; then
+ echo "Usage: mdl.sh <cluster name> <chukwa sequence file>"
+ echo ""
+ exit 1
+fi
+
+${JAVA_HOME}/bin/java -Xms2048M -Xmx3096M -DDATACONFIG=${CHUKWA_CONF_DIR}/mdl.xml -Djava.library.path=${JAVA_LIBRARY_PATH} -DCHUKWA_HOME=${CHUKWA_HOME} -DCHUKWA_CONF_DIR=${CHUKWA_CONF_DIR} -DCHUKWA_LOG_DIR=${CHUKWA_LOG_DIR} -DAPP=MDL -Dlog4j.configuration=chukwa-log4j.properties -classpath ${CLASSPATH}:/homes/eyang/chukwa-core-0.1.1.jar:${HADOOP_JAR}:${COMMON}:${tools}:${CHUKWA_HOME}/conf org.apache.hadoop.chukwa.extraction.database.MetricDataLoader $@
+
Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/database/MetricDataLoader.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/database/MetricDataLoader.java?rev=748628&r1=748627&r2=748628&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/database/MetricDataLoader.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/database/MetricDataLoader.java Fri Feb 27 18:23:01 2009
@@ -57,6 +57,7 @@
private static HashMap<String, String> dbTables = null;
private HashMap<String, HashMap<String,Integer>> dbSchema = null;
private static String newSpace="-";
+ private static boolean batchMode = true;
/** Creates a new instance of DBWriter */
public MetricDataLoader() {
@@ -146,31 +147,30 @@
public void process(Path source) throws IOException, URISyntaxException, SQLException {
- System.out.println("Input file:" + source.getName());
+ System.out.println("Input file:" + source.getName());
- ChukwaConfiguration conf = new ChukwaConfiguration();
- String fsName = conf.get("writer.hdfs.filesystem");
- FileSystem fs = FileSystem.get(new URI(fsName), conf);
+ ChukwaConfiguration conf = new ChukwaConfiguration();
+ String fsName = conf.get("writer.hdfs.filesystem");
+ FileSystem fs = FileSystem.get(new URI(fsName), conf);
- SequenceFile.Reader r =
+ SequenceFile.Reader r =
new SequenceFile.Reader(fs,source, conf);
stmt = conn.createStatement();
conn.setAutoCommit(false);
- ChukwaRecordKey key = new ChukwaRecordKey();
- ChukwaRecord record = new ChukwaRecord();
- try
- {
- while (r.next(key, record))
- {
- boolean isSuccessful=true;
- String sqlTime = DatabaseWriter.formatTimeStamp(record.getTime());
- log.debug("Timestamp: " + record.getTime());
- log.debug("DataType: " + key.getReduceType());
- log.debug("StreamName: " + source.getName());
+ ChukwaRecordKey key = new ChukwaRecordKey();
+ ChukwaRecord record = new ChukwaRecord();
+ try {
+ int batch=0;
+ while (r.next(key, record)) {
+ boolean isSuccessful=true;
+ String sqlTime = DatabaseWriter.formatTimeStamp(record.getTime());
+ log.debug("Timestamp: " + record.getTime());
+ log.debug("DataType: " + key.getReduceType());
+ log.debug("StreamName: " + source.getName());
- String[] fields = record.getFields();
+ String[] fields = record.getFields();
String table = null;
String[] priKeys = null;
HashMap<String, HashMap<String, String>> hashReport = new HashMap<String ,HashMap<String, String>>();
@@ -232,6 +232,7 @@
}
Iterator<String> i = hashReport.keySet().iterator();
while(i.hasNext()) {
+ long currentTimeMillis = System.currentTimeMillis();
Object iteratorNode = i.next();
HashMap<String, String> recordSet = hashReport.get(iteratorNode);
Iterator<String> fi = recordSet.keySet().iterator();
@@ -298,19 +299,28 @@
" ON DUPLICATE KEY UPDATE " + sqlValues + ";";
}
log.debug(sql);
- stmt.addBatch(sql);
+ if(batchMode) {
+ stmt.addBatch(sql);
+ batch++;
+ } else {
+ stmt.execute(sql);
+ }
String logMsg = (isSuccessful ? "Saved" : "Error occurred in saving");
- long currentTimeMillis = System.currentTimeMillis();
- long latencyMillis = currentTimeMillis - record.getTime();
+ long latencyMillis = System.currentTimeMillis() - currentTimeMillis;
int latencySeconds = ((int)(latencyMillis + 500)) / 1000;
+ if(batchMode && batch>20000) {
+ int[] updateCounts = stmt.executeBatch();
+ batch=0;
+ }
log.debug(logMsg + " (" + recordType + "," + RecordUtil.getClusterName(record) +
"," + record.getTime() +
") " + latencySeconds + " sec");
}
}
- @SuppressWarnings("unused")
- int[] updateCounts = stmt.executeBatch();
+ if(batchMode) {
+ int[] updateCounts = stmt.executeBatch();
+ }
} catch (SQLException ex) {
// handle any errors
log.error(ex, ex);
@@ -341,8 +351,8 @@
public static void main(String[] args) {
try {
- MetricDataLoader mdl = new MetricDataLoader();
- mdl.process(new Path(args[0]));
+ MetricDataLoader mdl = new MetricDataLoader(args[0]);
+ mdl.process(new Path(args[1]));
} catch(Exception e) {
e.printStackTrace();
}