You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/05/07 19:57:41 UTC

git commit: CRUNCH-203: Fix failing Trevni outputformat for hadoop-2

Updated Branches:
  refs/heads/master 56b905099 -> 6b3f2894e


CRUNCH-203: Fix failing Trevni outputformat for hadoop-2


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/6b3f2894
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/6b3f2894
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/6b3f2894

Branch: refs/heads/master
Commit: 6b3f2894eee7697d6d82e8ee354d9d996fab3c50
Parents: 56b9050
Author: Josh Wills <jw...@apache.org>
Authored: Tue May 7 10:53:34 2013 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Tue May 7 10:53:34 2013 -0700

----------------------------------------------------------------------
 .../crunch/io/avro/trevni/TrevniKeyTarget.java     |    9 +-
 .../crunch/io/avro/trevni/TrevniOutputFormat.java  |   40 ++++
 .../crunch/io/avro/trevni/TrevniRecordWriter.java  |  140 +++++++++++++++
 3 files changed, 184 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/6b3f2894/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeyTarget.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeyTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeyTarget.java
index 44d259b..2ede024 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeyTarget.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeyTarget.java
@@ -38,7 +38,6 @@ import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.trevni.avro.mapreduce.AvroTrevniKeyOutputFormat;
 
 import java.io.IOException;
 import java.util.Collection;
@@ -57,7 +56,7 @@ public class TrevniKeyTarget extends FileTargetImpl {
   }
 
   public TrevniKeyTarget(Path path, FileNamingScheme fileNamingScheme) {
-    super(path, AvroTrevniKeyOutputFormat.class, fileNamingScheme);
+    super(path, TrevniOutputFormat.class, fileNamingScheme);
   }
 
   @Override
@@ -84,11 +83,11 @@ public class TrevniKeyTarget extends FileTargetImpl {
       AvroJob.setMapOutputKeySchema(job, atype.getSchema());
 
       Avros.configureReflectDataFactory(conf);
-      configureForMapReduce(job, AvroKey.class, NullWritable.class, AvroTrevniKeyOutputFormat.class,
+      configureForMapReduce(job, AvroKey.class, NullWritable.class, TrevniOutputFormat.class,
           outputPath, name);
     } else {
-      FormatBundle<AvroTrevniKeyOutputFormat> bundle = FormatBundle.forOutput(
-          AvroTrevniKeyOutputFormat.class);
+      FormatBundle<TrevniOutputFormat> bundle = FormatBundle.forOutput(
+          TrevniOutputFormat.class);
 
       bundle.set("avro.schema.output.key", atype.getSchema().toString());
       bundle.set("mapred.output.value.groupfn.class", AvroKeyComparator.class.getName());

http://git-wip-us.apache.org/repos/asf/crunch/blob/6b3f2894/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniOutputFormat.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniOutputFormat.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniOutputFormat.java
new file mode 100644
index 0000000..9191788
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniOutputFormat.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.avro.trevni;
+
+import java.io.IOException;
+
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.AvroValue;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+/**
+ *
+ */
+public class TrevniOutputFormat<T> extends FileOutputFormat<AvroKey<T>, NullWritable> { 
+
+  /** {@inheritDoc} */
+  @Override
+  public RecordWriter<AvroKey<T>, NullWritable> getRecordWriter(TaskAttemptContext context)
+      throws IOException, InterruptedException {
+    return new TrevniRecordWriter<T>(context);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/6b3f2894/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniRecordWriter.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniRecordWriter.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniRecordWriter.java
new file mode 100644
index 0000000..74bb796
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniRecordWriter.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.avro.trevni;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.avro.Schema;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapreduce.AvroJob;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.trevni.ColumnFileMetaData;
+import org.apache.trevni.MetaData;
+import org.apache.trevni.avro.AvroColumnWriter;
+
+/**
+ *
+ */
+public class TrevniRecordWriter<T> extends RecordWriter<AvroKey<T>, NullWritable> {
+
+  /** trevni file extension */
+  public final static String EXT = ".trv";
+  
+  /** prefix of job configs that we care about */
+  public static final String META_PREFIX = "trevni.meta.";
+  
+  /** Counter that increments as new trevni files are create because the current file 
+   * has exceeded the block size 
+   * */
+  protected int part = 0;
+
+  /** Trevni file writer */
+  protected AvroColumnWriter<T> writer;
+
+  /** This will be a unique directory linked to the task */
+  final Path dirPath;
+  
+  /** HDFS object */
+  final FileSystem fs;
+
+  /** Current configured blocksize */
+  final long blockSize;
+  
+  /** Provided avro schema from the context */
+  protected Schema schema;
+  
+  /** meta data to be stored in the output file.  */
+  protected ColumnFileMetaData meta;
+  
+  public TrevniRecordWriter(TaskAttemptContext context) throws IOException {
+    schema = initSchema(context);
+    meta = filterMetadata(context.getConfiguration());
+    writer = new AvroColumnWriter<T>(schema, meta, ReflectData.get());
+
+    Path outputPath = FileOutputFormat.getOutputPath(context);
+    
+    String dir = FileOutputFormat.getUniqueFile(context, "part", "");
+    dirPath = new Path(outputPath.toString() + "/" + dir);
+    fs = dirPath.getFileSystem(context.getConfiguration());
+    fs.mkdirs(dirPath);
+
+    blockSize = fs.getDefaultBlockSize();
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void write(AvroKey<T> key, NullWritable value) throws IOException,
+      InterruptedException {
+    writer.write(key.datum());
+    if (writer.sizeEstimate() >= blockSize) // block full
+      flush();
+  }
+
+  /** {@inheritDoc} */
+  protected Schema initSchema(TaskAttemptContext context) {
+    boolean isMapOnly = context.getNumReduceTasks() == 0;
+    return isMapOnly ? AvroJob.getMapOutputKeySchema(context
+        .getConfiguration()) : AvroJob.getOutputKeySchema(context
+        .getConfiguration());
+  }
+  
+  /**
+   * A Trevni flush will close the current file and prep a new writer
+   * @throws IOException
+   */
+  public void flush() throws IOException {
+    OutputStream out = fs.create(new Path(dirPath, "part-" + (part++) + EXT));
+    try {
+      writer.writeTo(out);
+    } finally {
+      out.close();
+    }
+    writer = new AvroColumnWriter<T>(schema, meta, ReflectData.get());
+  }
+  
+  /** {@inheritDoc} */
+  @Override
+  public void close(TaskAttemptContext arg0) throws IOException,
+      InterruptedException {
+    flush();
+  }
+  
+  static ColumnFileMetaData filterMetadata(final Configuration configuration) {
+    final ColumnFileMetaData meta = new ColumnFileMetaData();
+    Iterator<Entry<String, String>> keyIterator = configuration.iterator();
+
+    while (keyIterator.hasNext()) {
+      Entry<String, String> confEntry = keyIterator.next();
+      if (confEntry.getKey().startsWith(META_PREFIX))
+        meta.put(confEntry.getKey().substring(META_PREFIX.length()), confEntry
+            .getValue().getBytes(MetaData.UTF8));
+    }
+
+    return meta;
+  }
+}