You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by nz...@apache.org on 2011/08/13 07:14:39 UTC
svn commit: r1157323 -
/hive/trunk/cli/src/java/org/apache/hadoop/hive/cli/RCFileCat.java
Author: nzhang
Date: Sat Aug 13 05:14:38 2011
New Revision: 1157323
URL: http://svn.apache.org/viewvc?rev=1157323&view=rev
Log:
HIVE-2370. Improve RCFileCat performance significantly (Tim Armstrong via Ning Zhang)
Modified:
hive/trunk/cli/src/java/org/apache/hadoop/hive/cli/RCFileCat.java
Modified: hive/trunk/cli/src/java/org/apache/hadoop/hive/cli/RCFileCat.java
URL: http://svn.apache.org/viewvc/hive/trunk/cli/src/java/org/apache/hadoop/hive/cli/RCFileCat.java?rev=1157323&r1=1157322&r2=1157323&view=diff
==============================================================================
--- hive/trunk/cli/src/java/org/apache/hadoop/hive/cli/RCFileCat.java (original)
+++ hive/trunk/cli/src/java/org/apache/hadoop/hive/cli/RCFileCat.java Sat Aug 13 05:14:38 2011
@@ -18,6 +18,16 @@
package org.apache.hadoop.hive.cli;
+import java.io.BufferedOutputStream;
+import java.io.FileDescriptor;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.CodingErrorAction;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -31,10 +41,29 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
+
public class RCFileCat implements Tool{
-
+ // Size of string buffer in bytes
+ private static final int STRING_BUFFER_SIZE = 16 * 1024;
+ // The size to flush the string buffer at
+ private static final int STRING_BUFFER_FLUSH_SIZE = 14 * 1024;
+
+ // Size of stdout buffer in bytes
+ private static final int STDOUT_BUFFER_SIZE = 128 * 1024;
+ // In verbose mode, print an update per RECORD_PRINT_INTERVAL records
+ private static final int RECORD_PRINT_INTERVAL = (1024*1024);
+
+ public RCFileCat() {
+ super();
+ decoder = Charset.forName("UTF-8").newDecoder().
+ onMalformedInput(CodingErrorAction.REPLACE).
+ onUnmappableCharacter(CodingErrorAction.REPLACE);
+ }
+
+ private static CharsetDecoder decoder;
+
Configuration conf = null;
-
+
private static String TAB ="\t";
private static String NEWLINE ="\r\n";
@@ -42,7 +71,10 @@ public class RCFileCat implements Tool{
public int run(String[] args) throws Exception {
long start = 0l;
long length = -1l;
-
+ int recordCount = 0;
+ long startT = System.currentTimeMillis();
+ boolean verbose = false;
+
//get options from arguments
if (args.length < 1 || args.length > 3) {
printUsage(null);
@@ -54,13 +86,16 @@ public class RCFileCat implements Tool{
start = Long.parseLong(arg.substring("--start=".length()));
} else if (arg.startsWith("--length=")) {
length = Long.parseLong(arg.substring("--length=".length()));
+ } else if (arg.equals("--verbose")) {
+ verbose = true;
} else if (fileName == null){
fileName = new Path(arg);
} else {
printUsage(null);
}
}
-
+
+ setupBufferedOutput();
FileSystem fs = FileSystem.get(fileName.toUri(), conf);
long fileLen = fs.getFileStatus(fileName).getLen();
if (start < 0) {
@@ -72,29 +107,61 @@ public class RCFileCat implements Tool{
if (length < 0 || (start + length) > fileLen) {
length = fileLen - start;
}
-
+
//share the code with RecordReader.
FileSplit split = new FileSplit(fileName,start, length, new JobConf(conf));
RCFileRecordReader recordReader = new RCFileRecordReader(conf, split);
LongWritable key = new LongWritable();
BytesRefArrayWritable value = new BytesRefArrayWritable();
- Text txt = new Text();
+ StringBuilder buf = new StringBuilder(STRING_BUFFER_SIZE); // extra capacity in case we overrun, to avoid resizing
while (recordReader.next(key, value)) {
- txt.clear();
- for (int i = 0; i < value.size(); i++) {
- BytesRefWritable v = value.get(i);
- txt.set(v.getData(), v.getStart(), v.getLength());
- System.out.print(txt.toString());
- if (i < value.size() - 1) {
- // do not put the TAB for the last column
- System.out.print(RCFileCat.TAB);
- }
+ printRecord(value, buf);
+ recordCount++;
+ if (verbose && (recordCount % RECORD_PRINT_INTERVAL) == 0) {
+ long now = System.currentTimeMillis();
+ System.err.println("Read " + recordCount/1024 + "k records");
+ System.err.println("Read " + ((recordReader.getPos() / (1024L*1024L)))
+ + "MB");
+ System.err.printf("Input scan rate %.2f MB/s\n",
+ (recordReader.getPos() * 1.0 / (now - startT)) / 1024.0);
+ }
+ if (buf.length() > STRING_BUFFER_FLUSH_SIZE) {
+ System.out.print(buf.toString());
+ buf.setLength(0);
}
- System.out.print(RCFileCat.NEWLINE);
}
+ // print out last part of buffer
+ System.out.print(buf.toString());
+ System.out.flush();
return 0;
}
+
+ /**
+ * Print record to string builder
+ * @param value
+ * @param buf
+ * @throws IOException
+ */
+ private void printRecord(BytesRefArrayWritable value, StringBuilder buf)
+ throws IOException {
+ int n = value.size();
+ if (n > 0) {
+ BytesRefWritable v = value.unCheckedGet(0);
+ ByteBuffer bb = ByteBuffer.wrap(v.getData(), v.getStart(), v.getLength());
+ buf.append(decoder.decode(bb));
+ for (int i = 1; i < n; i++) {
+ // do not put the TAB for the last column
+ buf.append(RCFileCat.TAB);
+
+ v = value.unCheckedGet(i);
+ bb = ByteBuffer.wrap(v.getData(), v.getStart(), v.getLength());
+ buf.append(decoder.decode(bb));
+ }
+ buf.append(RCFileCat.NEWLINE);
+ }
+ }
+
@Override
public Configuration getConf() {
return conf;
@@ -104,26 +171,39 @@ public class RCFileCat implements Tool{
public void setConf(Configuration conf) {
this.conf = conf;
}
-
- private static String Usage = "RCFileCat [--start=start_offet] [--length=len] fileName";
-
+
+ private static String Usage = "RCFileCat [--start=start_offet] [--length=len] [--verbose] fileName";
+
public static void main(String[] args) {
try {
+
Configuration conf = new Configuration();
RCFileCat instance = new RCFileCat();
instance.setConf(conf);
+
ToolRunner.run(instance, args);
} catch (Exception e) {
+ e.printStackTrace();
+ System.err.println("\n\n\n");
printUsage(e.getMessage());
}
}
-
+
+ private static void setupBufferedOutput() {
+ FileOutputStream fdout =
+ new FileOutputStream(FileDescriptor.out);
+ BufferedOutputStream bos =
+ new BufferedOutputStream(fdout, STDOUT_BUFFER_SIZE);
+ PrintStream ps =
+ new PrintStream(bos, false);
+ System.setOut(ps);
+ }
private static void printUsage(String errorMsg) {
- System.out.println(Usage);
+ System.err.println(Usage);
if(errorMsg != null) {
- System.out.println(errorMsg);
+ System.err.println(errorMsg);
}
System.exit(1);
}
-
+
}