You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/01/31 23:26:42 UTC
svn commit: r502021 - in /lucene/hadoop/trunk: ./
src/contrib/streaming/src/java/org/apache/hadoop/streaming/
src/java/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/
Author: cutting
Date: Wed Jan 31 14:26:41 2007
New Revision: 502021
URL: http://svn.apache.org/viewvc?view=rev&rev=502021
Log:
HADOOP-788. Change contrib/streaming to subclass TextInputFormat. Contributed by Sanjay.
Added:
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LineRecordReader.java
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MergerInputFormat.java
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextInputFormat.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=502021&r1=502020&r2=502021
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Wed Jan 31 14:26:41 2007
@@ -114,6 +114,10 @@
35. HADOOP-881. Fix JobTracker web interface to display the correct
number of task failures. (Sanjay Dahiya via cutting)
+36. HADOOP-788. Change contrib/streaming to subclass TextInputFormat,
+ permitting it to take advantage of native compression facilities.
+ (Sanjay Dahiya via cutting)
+
Release 0.10.1 - 2007-01-10
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MergerInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MergerInputFormat.java?view=diff&rev=502021&r1=502020&r2=502021
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MergerInputFormat.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MergerInputFormat.java Wed Jan 31 14:26:41 2007
@@ -85,7 +85,7 @@
full file at a time... )
*/
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
- return ((StreamInputFormat) primary_).getFullFileSplits(job);
+ return ((StreamInputFormat) primary_).getSplits(job, numSplits);
}
/**
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java?view=diff&rev=502021&r1=502020&r2=502021
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java Wed Jan 31 14:26:41 2007
@@ -100,16 +100,6 @@
/// StreamBaseRecordReader API
- public void init() throws IOException {
- LOG.info("StreamBaseRecordReader.init: " + " start_=" + start_ + " end_=" + end_ + " length_="
- + length_ + " start_ > in_.getPos() =" + (start_ > in_.getPos()) + " " + start_ + " > "
- + in_.getPos());
- if (start_ > in_.getPos()) {
- in_.seek(start_);
- }
- seekNextRecordBoundary();
- }
-
/** Implementation should seek forward in_ to the first byte of the next record.
* The initial byte offset in the stream is arbitrary.
*/
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java?view=diff&rev=502021&r1=502020&r2=502021
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java Wed Jan 31 14:26:41 2007
@@ -27,6 +27,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.*;
@@ -34,7 +35,7 @@
* selects a RecordReader based on a JobConf property.
* @author Michel Tourn
*/
-public class StreamInputFormat extends InputFormatBase {
+public class StreamInputFormat extends TextInputFormat {
// an InputFormat should be public with the synthetic public default constructor
// JobTracker's JobInProgress will instantiate with clazz.newInstance() (and a custom ClassLoader)
@@ -54,7 +55,6 @@
return super.getSplits(job, numSplits);
}
}
-
/** For the compressed-files case: override InputFormatBase to produce one split. */
FileSplit[] getFullFileSplits(JobConf job) throws IOException {
Path[] files = listPaths(job);
@@ -79,9 +79,8 @@
final long start = split.getStart();
final long end = start + split.getLength();
- String splitName = split.getPath() + ":" + start + "-" + end;
- final FSDataInputStream in = fs.open(split.getPath());
-
+ FSDataInputStream in = fs.open(split.getPath());
+
// will open the file and seek to the start of the split
// Factory dispatch based on available params..
Class readerClass;
@@ -103,15 +102,13 @@
throw new RuntimeException(nsm);
}
- StreamBaseRecordReader reader;
+ RecordReader reader;
try {
- reader = (StreamBaseRecordReader) ctor.newInstance(new Object[] { in, split, reporter, job,
+ reader = (RecordReader) ctor.newInstance(new Object[] { in, split, reporter, job,
fs });
} catch (Exception nsm) {
throw new RuntimeException(nsm);
}
-
- reader.init();
if (reader instanceof StreamSequenceRecordReader) {
// override k/v class types with types stored in SequenceFile
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java?view=diff&rev=502021&r1=502020&r2=502021
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java Wed Jan 31 14:26:41 2007
@@ -19,59 +19,65 @@
package org.apache.hadoop.streaming;
import java.io.*;
-import java.nio.charset.MalformedInputException;
-import java.util.Arrays;
-import java.util.zip.GZIPInputStream;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.mapred.LineRecordReader;
import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.util.StringUtils;
/**
* Similar to org.apache.hadoop.mapred.TextRecordReader,
* but delimits key and value with a TAB.
* @author Michel Tourn
*/
-public class StreamLineRecordReader extends StreamBaseRecordReader {
+public class StreamLineRecordReader extends LineRecordReader {
+
+ private String splitName;
+ private Reporter reporter;
+ private FileSplit split;
+ private int numRec = 0;
+ private int nextStatusRec = 1;
+ private int statusMaxRecordChars;
+ protected static final Log LOG = LogFactory.getLog(StreamLineRecordReader.class);
+ // base class uses LongWritable as key, use this.
+ private WritableComparable dummyKey = super.createKey();
+ private Text innerValue = (Text)super.createValue();
- public StreamLineRecordReader(FSDataInputStream in, FileSplit split, Reporter reporter,
+ public StreamLineRecordReader(FSDataInputStream in, FileSplit split,
+ Reporter reporter,
JobConf job, FileSystem fs) throws IOException {
- super(in, split, reporter, job, fs);
- gzipped_ = StreamInputFormat.isGzippedInput(job);
- if (gzipped_) {
- din_ = new BufferedInputStream( (new GZIPInputStream(in_) ) );
- } else {
- din_ = in_;
- }
+ super(createStream(in, job), split.getStart(),
+ split.getStart() + split.getLength());
+ this.split = split ;
+ this.reporter = reporter ;
}
-
- public void seekNextRecordBoundary() throws IOException {
- if (gzipped_) {
- // no skipping: use din_ as-is
- // assumes splitter created only one split per file
- return;
- } else {
- int bytesSkipped = 0;
- if (start_ != 0) {
- in_.seek(start_ - 1);
- // scan to the next newline in the file
- while (in_.getPos() < end_) {
- char c = (char) in_.read();
- bytesSkipped++;
- if (c == '\r' || c == '\n') {
- break;
- }
- }
- }
-
- //System.out.println("getRecordReader start="+start_ + " end=" + end_ + " bytesSkipped"+bytesSkipped);
- }
+
+ private static InputStream createStream(FSDataInputStream in, JobConf job)
+ throws IOException{
+ InputStream finalStream = in ;
+ boolean gzipped = StreamInputFormat.isGzippedInput(job);
+ if ( gzipped ) {
+ GzipCodec codec = new GzipCodec();
+ codec.setConf(job);
+ finalStream = codec.createInputStream(in);
+ }
+ return finalStream;
+ }
+
+ public WritableComparable createKey() {
+ return new Text();
+ }
+
+ public Writable createValue() {
+ return new Text();
}
public synchronized boolean next(Writable key, Writable value) throws IOException {
@@ -86,14 +92,12 @@
Text tKey = (Text) key;
Text tValue = (Text) value;
- byte[] line;
-
- if ( !gzipped_ ) {
- long pos = in_.getPos();
- if (pos >= end_) return false;
+ byte[] line = null ;
+ if( super.next(dummyKey, innerValue) ){
+ line = innerValue.getBytes();
+ }else{
+ return false;
}
-
- line = UTF8ByteArrayUtils.readLine((InputStream) din_);
if (line == null) return false;
int tab = UTF8ByteArrayUtils.findTab(line);
if (tab == -1) {
@@ -105,7 +109,35 @@
numRecStats(line, 0, line.length);
return true;
}
+
+ private void numRecStats(byte[] record, int start, int len) throws IOException {
+ numRec++;
+ if (numRec == nextStatusRec) {
+ String recordStr = new String(record, start, Math.min(len, statusMaxRecordChars), "UTF-8");
+ nextStatusRec += 100;//*= 10;
+ String status = getStatus(recordStr);
+ LOG.info(status);
+ reporter.setStatus(status);
+ }
+ }
- boolean gzipped_;
- InputStream din_; // GZIP or plain
+ private String getStatus(CharSequence record) {
+ long pos = -1;
+ try {
+ pos = getPos();
+ } catch (IOException io) {
+ }
+ String recStr;
+ if (record.length() > statusMaxRecordChars) {
+ recStr = record.subSequence(0, statusMaxRecordChars) + "...";
+ } else {
+ recStr = record.toString();
+ }
+ String unqualSplit = split.getFile().getName() + ":" + split.getStart() + "+"
+ + split.getLength();
+ String status = "HSTR " + StreamUtil.HOST + " " + numRec + ". pos=" + pos + " " + unqualSplit
+ + " Processing record=" + recStr;
+ status += " " + splitName;
+ return status;
+ }
}
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java?view=diff&rev=502021&r1=502020&r2=502021
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java Wed Jan 31 14:26:41 2007
@@ -62,8 +62,19 @@
beginPat_ = makePatternCDataOrMark(beginMark_);
endPat_ = makePatternCDataOrMark(endMark_);
}
+ init();
}
+ public void init() throws IOException {
+ LOG.info("StreamBaseRecordReader.init: " + " start_=" + start_ + " end_=" + end_ + " length_="
+ + length_ + " start_ > in_.getPos() =" + (start_ > in_.getPos()) + " " + start_ + " > "
+ + in_.getPos());
+ if (start_ > in_.getPos()) {
+ in_.seek(start_);
+ }
+ seekNextRecordBoundary();
+ }
+
int numNext = 0;
public synchronized boolean next(Writable key, Writable value) throws IOException {
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LineRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LineRecordReader.java?view=auto&rev=502021
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LineRecordReader.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LineRecordReader.java Wed Jan 31 14:26:41 2007
@@ -0,0 +1,127 @@
+package org.apache.hadoop.mapred;
+
+import java.io.BufferedInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Treats keys as offset in file and value as line.
+ * @author sanjaydahiya
+ *
+ */
+public class LineRecordReader implements RecordReader {
+ private long start;
+ private long pos;
+ private long end;
+ private BufferedInputStream in;
+ private ByteArrayOutputStream buffer = new ByteArrayOutputStream(256);
+ /**
+ * Provide a bridge to get the bytes from the ByteArrayOutputStream
+ * without creating a new byte array.
+ */
+ private static class TextStuffer extends OutputStream {
+ public Text target;
+ public void write(int b) {
+ throw new UnsupportedOperationException("write(byte) not supported");
+ }
+ public void write(byte[] data, int offset, int len) throws IOException {
+ target.set(data, offset, len);
+ }
+ }
+ private TextStuffer bridge = new TextStuffer();
+
+ public LineRecordReader(InputStream in, long offset, long endOffset)
+ throws IOException{
+ this.in = new BufferedInputStream(in);
+ this.start = offset;
+ this.pos = offset;
+ this.end = endOffset;
+// readLine(in, null);
+ }
+
+ public WritableComparable createKey() {
+ return new LongWritable();
+ }
+
+ public Writable createValue() {
+ return new Text();
+ }
+
+ /** Read a line. */
+ public synchronized boolean next(Writable key, Writable value)
+ throws IOException {
+ if (pos >= end)
+ return false;
+
+ ((LongWritable)key).set(pos); // key is position
+ buffer.reset();
+ long bytesRead = readLine(in, buffer);
+ if (bytesRead == 0) {
+ return false;
+ }
+ pos += bytesRead;
+ bridge.target = (Text) value;
+ buffer.writeTo(bridge);
+ return true;
+ }
+
+ public static long readLine(InputStream in,
+ OutputStream out) throws IOException {
+ long bytes = 0;
+ while (true) {
+
+ int b = in.read();
+ if (b == -1) {
+ break;
+ }
+ bytes += 1;
+
+ byte c = (byte)b;
+ if (c == '\n') {
+ break;
+ }
+
+ if (c == '\r') {
+ in.mark(1);
+ byte nextC = (byte)in.read();
+ if (nextC != '\n') {
+ in.reset();
+ } else {
+ bytes += 1;
+ }
+ break;
+ }
+
+ if (out != null) {
+ out.write(c);
+ }
+ }
+ return bytes;
+ }
+
+ /**
+ * Get the progress within the split
+ */
+ public float getProgress() {
+ if (start == end) {
+ return 0.0f;
+ } else {
+ return (pos - start) / (end - start);
+ }
+ }
+
+ public synchronized long getPos() throws IOException {
+ return pos;
+ }
+
+ public synchronized void close() throws IOException {
+ in.close();
+ }
+}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextInputFormat.java?view=diff&rev=502021&r1=502020&r2=502021
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextInputFormat.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextInputFormat.java Wed Jan 31 14:26:41 2007
@@ -21,7 +21,6 @@
import java.io.*;
import org.apache.hadoop.fs.*;
-import org.apache.hadoop.io.*;
import org.apache.hadoop.io.compress.*;
/** An {@link InputFormat} for plain text files. Files are broken into lines.
@@ -39,81 +38,6 @@
return compressionCodecs.getCodec(file) == null;
}
- protected static class LineRecordReader implements RecordReader {
- private long start;
- private long pos;
- private long end;
- private BufferedInputStream in;
- private ByteArrayOutputStream buffer = new ByteArrayOutputStream(256);
- /**
- * Provide a bridge to get the bytes from the ByteArrayOutputStream
- * without creating a new byte array.
- */
- private static class TextStuffer extends OutputStream {
- public Text target;
- public void write(int b) {
- throw new UnsupportedOperationException("write(byte) not supported");
- }
- public void write(byte[] data, int offset, int len) throws IOException {
- target.set(data, offset, len);
- }
- }
- private TextStuffer bridge = new TextStuffer();
-
- public LineRecordReader(InputStream in, long offset, long endOffset) {
- this.in = new BufferedInputStream(in);
- this.start = offset;
- this.pos = offset;
- this.end = endOffset;
- }
-
- public WritableComparable createKey() {
- return new LongWritable();
- }
-
- public Writable createValue() {
- return new Text();
- }
-
- /**
- * Get the progress within the split
- */
- public float getProgress() {
- if (start == end) {
- return 0.0f;
- } else {
- return (pos - start) / (end - start);
- }
- }
-
- /** Read a line. */
- public synchronized boolean next(Writable key, Writable value)
- throws IOException {
- if (pos >= end)
- return false;
-
- ((LongWritable)key).set(pos); // key is position
- buffer.reset();
- long bytesRead = readLine(in, buffer);
- if (bytesRead == 0) {
- return false;
- }
- pos += bytesRead;
- bridge.target = (Text) value;
- buffer.writeTo(bridge);
- return true;
- }
-
- public synchronized long getPos() throws IOException {
- return pos;
- }
-
- public synchronized void close() throws IOException {
- in.close();
- }
-
- }
-
public RecordReader getRecordReader(InputSplit genericSplit,
JobConf job, Reporter reporter)
throws IOException {
@@ -129,52 +53,16 @@
FileSystem fs = FileSystem.get(job);
FSDataInputStream fileIn = fs.open(split.getPath());
InputStream in = fileIn;
-
if (codec != null) {
in = codec.createInputStream(fileIn);
end = Long.MAX_VALUE;
} else if (start != 0) {
fileIn.seek(start-1);
- readLine(fileIn, null);
+ LineRecordReader.readLine(fileIn, null);
start = fileIn.getPos();
}
return new LineRecordReader(in, start, end);
}
-
- public static long readLine(InputStream in,
- OutputStream out) throws IOException {
- long bytes = 0;
- while (true) {
-
- int b = in.read();
- if (b == -1) {
- break;
- }
- bytes += 1;
-
- byte c = (byte)b;
- if (c == '\n') {
- break;
- }
-
- if (c == '\r') {
- in.mark(1);
- byte nextC = (byte)in.read();
- if (nextC != '\n') {
- in.reset();
- } else {
- bytes += 1;
- }
- break;
- }
-
- if (out != null) {
- out.write(c);
- }
- }
- return bytes;
- }
-
}
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java?view=diff&rev=502021&r1=502020&r2=502021
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java Wed Jan 31 14:26:41 2007
@@ -130,14 +130,14 @@
public void testUTF8() throws Exception {
InputStream in = makeStream("abcd\u20acbdcd\u20ac");
ByteArrayOutputStream out = new ByteArrayOutputStream();
- TextInputFormat.readLine(in, out);
+ LineRecordReader.readLine(in, out);
Text line = new Text();
line.set(out.toByteArray());
assertEquals("readLine changed utf8 characters",
"abcd\u20acbdcd\u20ac", line.toString());
in = makeStream("abc\u200axyz");
out.reset();
- TextInputFormat.readLine(in, out);
+ LineRecordReader.readLine(in, out);
line.set(out.toByteArray());
assertEquals("split on fake newline", "abc\u200axyz", line.toString());
}
@@ -145,24 +145,24 @@
public void testNewLines() throws Exception {
InputStream in = makeStream("a\nbb\n\nccc\rdddd\r\neeeee");
ByteArrayOutputStream out = new ByteArrayOutputStream();
- TextInputFormat.readLine(in, out);
+ LineRecordReader.readLine(in, out);
assertEquals("line1 length", 1, out.size());
out.reset();
- TextInputFormat.readLine(in, out);
+ LineRecordReader.readLine(in, out);
assertEquals("line2 length", 2, out.size());
out.reset();
- TextInputFormat.readLine(in, out);
+ LineRecordReader.readLine(in, out);
assertEquals("line3 length", 0, out.size());
out.reset();
- TextInputFormat.readLine(in, out);
+ LineRecordReader.readLine(in, out);
assertEquals("line4 length", 3, out.size());
out.reset();
- TextInputFormat.readLine(in, out);
+ LineRecordReader.readLine(in, out);
assertEquals("line5 length", 4, out.size());
out.reset();
- TextInputFormat.readLine(in, out);
+ LineRecordReader.readLine(in, out);
assertEquals("line5 length", 5, out.size());
- assertEquals("end of file", 0, TextInputFormat.readLine(in, out));
+ assertEquals("end of file", 0, LineRecordReader.readLine(in, out));
}
private static void writeFile(FileSystem fs, Path name,