You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by cw...@apache.org on 2014/06/23 12:17:29 UTC

svn commit: r1604731 - /hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/

Author: cws
Date: Mon Jun 23 10:17:29 2014
New Revision: 1604731

URL: http://svn.apache.org/r1604731
Log:
HIVE-7094: Separate out static/dynamic partitioning code in FileRecordWriterContainer (David Chen via cws)

Added:
    hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicPartitionFileRecordWriterContainer.java
    hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/StaticPartitionFileRecordWriterContainer.java
Modified:
    hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputFormatContainer.java
    hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileRecordWriterContainer.java

Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicPartitionFileRecordWriterContainer.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicPartitionFileRecordWriterContainer.java?rev=1604731&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicPartitionFileRecordWriterContainer.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicPartitionFileRecordWriterContainer.java Mon Jun 23 10:17:29 2014
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hive.hcatalog.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hive.hcatalog.common.ErrorType;
+import org.apache.hive.hcatalog.common.HCatException;
+import org.apache.hive.hcatalog.common.HCatUtil;
+import org.apache.hive.hcatalog.data.HCatRecord;
+
+/**
+ * Record writer container for tables using dynamic partitioning. See
+ * {@link FileOutputFormatContainer} for more information
+ */
+class DynamicPartitionFileRecordWriterContainer extends FileRecordWriterContainer {
+  private final List<Integer> dynamicPartCols;
+  private int maxDynamicPartitions;
+
+  private final Map<String, RecordWriter<? super WritableComparable<?>, ? super Writable>> baseDynamicWriters;
+  private final Map<String, SerDe> baseDynamicSerDe;
+  private final Map<String, org.apache.hadoop.mapred.OutputCommitter> baseDynamicCommitters;
+  private final Map<String, org.apache.hadoop.mapred.TaskAttemptContext> dynamicContexts;
+  private final Map<String, ObjectInspector> dynamicObjectInspectors;
+  private Map<String, OutputJobInfo> dynamicOutputJobInfo;
+
+  /**
+   * @param baseWriter RecordWriter to contain
+   * @param context current TaskAttemptContext
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public DynamicPartitionFileRecordWriterContainer(
+      RecordWriter<? super WritableComparable<?>, ? super Writable> baseWriter,
+      TaskAttemptContext context) throws IOException, InterruptedException {
+    super(baseWriter, context);
+    maxDynamicPartitions = jobInfo.getMaxDynamicPartitions();
+    dynamicPartCols = jobInfo.getPosOfDynPartCols();
+    if (dynamicPartCols == null) {
+      throw new HCatException("It seems that setSchema() is not called on "
+          + "HCatOutputFormat. Please make sure that method is called.");
+    }
+
+    this.baseDynamicSerDe = new HashMap<String, SerDe>();
+    this.baseDynamicWriters =
+        new HashMap<String, RecordWriter<? super WritableComparable<?>, ? super Writable>>();
+    this.baseDynamicCommitters = new HashMap<String, org.apache.hadoop.mapred.OutputCommitter>();
+    this.dynamicContexts = new HashMap<String, org.apache.hadoop.mapred.TaskAttemptContext>();
+    this.dynamicObjectInspectors = new HashMap<String, ObjectInspector>();
+    this.dynamicOutputJobInfo = new HashMap<String, OutputJobInfo>();
+  }
+
+  @Override
+  public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+    Reporter reporter = InternalUtil.createReporter(context);
+    for (RecordWriter<? super WritableComparable<?>, ? super Writable> bwriter : baseDynamicWriters
+        .values()) {
+      // We are in RecordWriter.close() make sense that the context would be
+      // TaskInputOutput.
+      bwriter.close(reporter);
+    }
+    for (Map.Entry<String, org.apache.hadoop.mapred.OutputCommitter> entry : baseDynamicCommitters
+        .entrySet()) {
+      org.apache.hadoop.mapred.TaskAttemptContext currContext = dynamicContexts.get(entry.getKey());
+      OutputCommitter baseOutputCommitter = entry.getValue();
+      if (baseOutputCommitter.needsTaskCommit(currContext)) {
+        baseOutputCommitter.commitTask(currContext);
+      }
+    }
+  }
+
+  @Override
+  protected LocalFileWriter getLocalFileWriter(HCatRecord value) throws IOException, HCatException {
+    OutputJobInfo localJobInfo = null;
+    // Calculate which writer to use from the remaining values - this needs to
+    // be done before we delete cols.
+    List<String> dynamicPartValues = new ArrayList<String>();
+    for (Integer colToAppend : dynamicPartCols) {
+      dynamicPartValues.add(value.get(colToAppend).toString());
+    }
+
+    String dynKey = dynamicPartValues.toString();
+    if (!baseDynamicWriters.containsKey(dynKey)) {
+      if ((maxDynamicPartitions != -1) && (baseDynamicWriters.size() > maxDynamicPartitions)) {
+        throw new HCatException(ErrorType.ERROR_TOO_MANY_DYNAMIC_PTNS,
+            "Number of dynamic partitions being created "
+                + "exceeds configured max allowable partitions[" + maxDynamicPartitions
+                + "], increase parameter [" + HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.varname
+                + "] if needed.");
+      }
+
+      org.apache.hadoop.mapred.TaskAttemptContext currTaskContext =
+          HCatMapRedUtil.createTaskAttemptContext(context);
+      configureDynamicStorageHandler(currTaskContext, dynamicPartValues);
+      localJobInfo = HCatBaseOutputFormat.getJobInfo(currTaskContext.getConfiguration());
+
+      // Setup serDe.
+      SerDe currSerDe =
+          ReflectionUtils.newInstance(storageHandler.getSerDeClass(), currTaskContext.getJobConf());
+      try {
+        InternalUtil.initializeOutputSerDe(currSerDe, currTaskContext.getConfiguration(),
+            localJobInfo);
+      } catch (SerDeException e) {
+        throw new IOException("Failed to initialize SerDe", e);
+      }
+
+      // create base OutputFormat
+      org.apache.hadoop.mapred.OutputFormat baseOF =
+          ReflectionUtils.newInstance(storageHandler.getOutputFormatClass(),
+              currTaskContext.getJobConf());
+
+      // We are skipping calling checkOutputSpecs() for each partition
+      // As it can throw a FileAlreadyExistsException when more than one
+      // mapper is writing to a partition.
+      // See HCATALOG-490, also to avoid contacting the namenode for each new
+      // FileOutputFormat instance.
+      // In general this should be ok for most FileOutputFormat implementations
+      // but may become an issue for cases when the method is used to perform
+      // other setup tasks.
+
+      // Get Output Committer
+      org.apache.hadoop.mapred.OutputCommitter baseOutputCommitter =
+          currTaskContext.getJobConf().getOutputCommitter();
+
+      // Create currJobContext the latest so it gets all the config changes
+      org.apache.hadoop.mapred.JobContext currJobContext =
+          HCatMapRedUtil.createJobContext(currTaskContext);
+
+      // Set up job.
+      baseOutputCommitter.setupJob(currJobContext);
+
+      // Recreate to refresh jobConf of currTask context.
+      currTaskContext =
+          HCatMapRedUtil.createTaskAttemptContext(currJobContext.getJobConf(),
+              currTaskContext.getTaskAttemptID(), currTaskContext.getProgressible());
+
+      // Set temp location.
+      currTaskContext.getConfiguration().set(
+          "mapred.work.output.dir",
+          new FileOutputCommitter(new Path(localJobInfo.getLocation()), currTaskContext)
+              .getWorkPath().toString());
+
+      // Set up task.
+      baseOutputCommitter.setupTask(currTaskContext);
+
+      Path parentDir = new Path(currTaskContext.getConfiguration().get("mapred.work.output.dir"));
+      Path childPath =
+          new Path(parentDir, FileOutputFormat.getUniqueFile(currTaskContext, "part", ""));
+
+      RecordWriter baseRecordWriter =
+          baseOF.getRecordWriter(parentDir.getFileSystem(currTaskContext.getConfiguration()),
+              currTaskContext.getJobConf(), childPath.toString(),
+              InternalUtil.createReporter(currTaskContext));
+
+      baseDynamicWriters.put(dynKey, baseRecordWriter);
+      baseDynamicSerDe.put(dynKey, currSerDe);
+      baseDynamicCommitters.put(dynKey, baseOutputCommitter);
+      dynamicContexts.put(dynKey, currTaskContext);
+      dynamicObjectInspectors.put(dynKey,
+          InternalUtil.createStructObjectInspector(jobInfo.getOutputSchema()));
+      dynamicOutputJobInfo.put(dynKey,
+          HCatOutputFormat.getJobInfo(dynamicContexts.get(dynKey).getConfiguration()));
+    }
+
+    return new LocalFileWriter(baseDynamicWriters.get(dynKey), dynamicObjectInspectors.get(dynKey),
+        baseDynamicSerDe.get(dynKey), dynamicOutputJobInfo.get(dynKey));
+  }
+
+  protected void configureDynamicStorageHandler(JobContext context, List<String> dynamicPartVals)
+      throws IOException {
+    HCatOutputFormat.configureOutputStorageHandler(context, dynamicPartVals);
+  }
+}

Modified: hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputFormatContainer.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputFormatContainer.java?rev=1604731&r1=1604730&r2=1604731&view=diff
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputFormatContainer.java (original)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputFormatContainer.java Mon Jun 23 10:17:29 2014
@@ -95,18 +95,19 @@ class FileOutputFormatContainer extends 
       // When Dynamic partitioning is used, the RecordWriter instance initialized here isn't used. Can use null.
       // (That's because records can't be written until the values of the dynamic partitions are deduced.
       // By that time, a new local instance of RecordWriter, with the correct output-path, will be constructed.)
-      rw = new FileRecordWriterContainer((org.apache.hadoop.mapred.RecordWriter)null,context);
+      rw = new DynamicPartitionFileRecordWriterContainer(
+          (org.apache.hadoop.mapred.RecordWriter)null, context);
     } else {
       Path parentDir = new Path(context.getConfiguration().get("mapred.work.output.dir"));
       Path childPath = new Path(parentDir,FileOutputFormat.getUniqueName(new JobConf(context.getConfiguration()), "part"));
 
-      rw = new FileRecordWriterContainer(
-            getBaseOutputFormat().getRecordWriter(
-                parentDir.getFileSystem(context.getConfiguration()),
-                new JobConf(context.getConfiguration()),
-                childPath.toString(),
-                InternalUtil.createReporter(context)),
-            context);
+      rw = new StaticPartitionFileRecordWriterContainer(
+          getBaseOutputFormat().getRecordWriter(
+              parentDir.getFileSystem(context.getConfiguration()),
+              new JobConf(context.getConfiguration()),
+              childPath.toString(),
+              InternalUtil.createReporter(context)),
+          context);
     }
     return rw;
   }

Modified: hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileRecordWriterContainer.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileRecordWriterContainer.java?rev=1604731&r1=1604730&r2=1604731&view=diff
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileRecordWriterContainer.java (original)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileRecordWriterContainer.java Mon Jun 23 10:17:29 2014
@@ -35,6 +35,7 @@ import org.apache.hadoop.io.NullWritable
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RecordWriter;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -47,31 +48,19 @@ import org.apache.hive.hcatalog.common.H
 import org.apache.hive.hcatalog.data.HCatRecord;
 
 /**
- * Part of the FileOutput*Container classes
- * See {@link FileOutputFormatContainer} for more information
+ * Part of the FileOutput*Container classes See {@link FileOutputFormatContainer} for more
+ * information
  */
-class FileRecordWriterContainer extends RecordWriterContainer {
-
-  private final HiveStorageHandler storageHandler;
-  private final SerDe serDe;
-  private final ObjectInspector objectInspector;
-
-  private boolean dynamicPartitioningUsed = false;
-
-  private final Map<String, org.apache.hadoop.mapred.RecordWriter<? super WritableComparable<?>, ? super Writable>> baseDynamicWriters;
-  private final Map<String, SerDe> baseDynamicSerDe;
-  private final Map<String, org.apache.hadoop.mapred.OutputCommitter> baseDynamicCommitters;
-  private final Map<String, org.apache.hadoop.mapred.TaskAttemptContext> dynamicContexts;
-  private final Map<String, ObjectInspector> dynamicObjectInspectors;
-  private Map<String, OutputJobInfo> dynamicOutputJobInfo;
+abstract class FileRecordWriterContainer extends RecordWriterContainer {
 
+  protected final HiveStorageHandler storageHandler;
+  protected final SerDe serDe;
+  protected final ObjectInspector objectInspector;
 
   private final List<Integer> partColsToDel;
-  private final List<Integer> dynamicPartCols;
-  private int maxDynamicPartitions;
 
-  private OutputJobInfo jobInfo;
-  private TaskAttemptContext context;
+  protected OutputJobInfo jobInfo;
+  protected TaskAttemptContext context;
 
   /**
    * @param baseWriter RecordWriter to contain
@@ -79,13 +68,16 @@ class FileRecordWriterContainer extends 
    * @throws IOException
    * @throws InterruptedException
    */
-  public FileRecordWriterContainer(org.apache.hadoop.mapred.RecordWriter<? super WritableComparable<?>, ? super Writable> baseWriter,
-                   TaskAttemptContext context) throws IOException, InterruptedException {
+  public FileRecordWriterContainer(
+      RecordWriter<? super WritableComparable<?>, ? super Writable> baseWriter,
+      TaskAttemptContext context) throws IOException, InterruptedException {
     super(context, baseWriter);
     this.context = context;
     jobInfo = HCatOutputFormat.getJobInfo(context.getConfiguration());
 
-    storageHandler = HCatUtil.getStorageHandler(context.getConfiguration(), jobInfo.getTableInfo().getStorerInfo());
+    storageHandler =
+        HCatUtil.getStorageHandler(context.getConfiguration(), jobInfo.getTableInfo()
+            .getStorerInfo());
     serDe = ReflectionUtils.newInstance(storageHandler.getSerDeClass(), context.getConfiguration());
     objectInspector = InternalUtil.createStructObjectInspector(jobInfo.getOutputSchema());
     try {
@@ -96,30 +88,9 @@ class FileRecordWriterContainer extends 
 
     // If partition columns occur in data, we want to remove them.
     partColsToDel = jobInfo.getPosOfPartCols();
-    dynamicPartitioningUsed = jobInfo.isDynamicPartitioningUsed();
-    dynamicPartCols = jobInfo.getPosOfDynPartCols();
-    maxDynamicPartitions = jobInfo.getMaxDynamicPartitions();
-
-    if ((partColsToDel == null) || (dynamicPartitioningUsed && (dynamicPartCols == null))) {
-      throw new HCatException("It seems that setSchema() is not called on " +
-        "HCatOutputFormat. Please make sure that method is called.");
-    }
-
-
-    if (!dynamicPartitioningUsed) {
-      this.baseDynamicSerDe = null;
-      this.baseDynamicWriters = null;
-      this.baseDynamicCommitters = null;
-      this.dynamicContexts = null;
-      this.dynamicObjectInspectors = null;
-      this.dynamicOutputJobInfo = null;
-    } else {
-      this.baseDynamicSerDe = new HashMap<String, SerDe>();
-      this.baseDynamicWriters = new HashMap<String, org.apache.hadoop.mapred.RecordWriter<? super WritableComparable<?>, ? super Writable>>();
-      this.baseDynamicCommitters = new HashMap<String, org.apache.hadoop.mapred.OutputCommitter>();
-      this.dynamicContexts = new HashMap<String, org.apache.hadoop.mapred.TaskAttemptContext>();
-      this.dynamicObjectInspectors = new HashMap<String, ObjectInspector>();
-      this.dynamicOutputJobInfo = new HashMap<String, OutputJobInfo>();
+    if (partColsToDel == null) {
+      throw new HCatException("It seems that setSchema() is not called on "
+          + "HCatOutputFormat. Please make sure that method is called.");
     }
   }
 
@@ -130,138 +101,59 @@ class FileRecordWriterContainer extends 
     return storageHandler;
   }
 
-  @Override
-  public void close(TaskAttemptContext context) throws IOException,
-    InterruptedException {
-    Reporter reporter = InternalUtil.createReporter(context);
-    if (dynamicPartitioningUsed) {
-      for (org.apache.hadoop.mapred.RecordWriter<? super WritableComparable<?>, ? super Writable> bwriter : baseDynamicWriters.values()) {
-        //We are in RecordWriter.close() make sense that the context would be TaskInputOutput
-        bwriter.close(reporter);
-      }
-      for (Map.Entry<String, org.apache.hadoop.mapred.OutputCommitter> entry : baseDynamicCommitters.entrySet()) {
-        org.apache.hadoop.mapred.TaskAttemptContext currContext = dynamicContexts.get(entry.getKey());
-        OutputCommitter baseOutputCommitter = entry.getValue();
-        if (baseOutputCommitter.needsTaskCommit(currContext)) {
-          baseOutputCommitter.commitTask(currContext);
-        }
-      }
-    } else {
-      getBaseRecordWriter().close(reporter);
-    }
-  }
+  abstract protected LocalFileWriter getLocalFileWriter(HCatRecord value) throws IOException,
+      HCatException;
 
   @Override
   public void write(WritableComparable<?> key, HCatRecord value) throws IOException,
-    InterruptedException {
-
-    org.apache.hadoop.mapred.RecordWriter localWriter;
-    ObjectInspector localObjectInspector;
-    SerDe localSerDe;
-    OutputJobInfo localJobInfo = null;
-
-    if (dynamicPartitioningUsed) {
-      // calculate which writer to use from the remaining values - this needs to be done before we delete cols
-      List<String> dynamicPartValues = new ArrayList<String>();
-      for (Integer colToAppend : dynamicPartCols) {
-        dynamicPartValues.add(value.get(colToAppend).toString());
-      }
-
-      String dynKey = dynamicPartValues.toString();
-      if (!baseDynamicWriters.containsKey(dynKey)) {
-        if ((maxDynamicPartitions != -1) && (baseDynamicWriters.size() > maxDynamicPartitions)) {
-          throw new HCatException(ErrorType.ERROR_TOO_MANY_DYNAMIC_PTNS,
-            "Number of dynamic partitions being created "
-              + "exceeds configured max allowable partitions["
-              + maxDynamicPartitions
-              + "], increase parameter ["
-              + HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.varname
-              + "] if needed.");
-        }
-
-        org.apache.hadoop.mapred.TaskAttemptContext currTaskContext = HCatMapRedUtil.createTaskAttemptContext(context);
-        configureDynamicStorageHandler(currTaskContext, dynamicPartValues);
-        localJobInfo = HCatBaseOutputFormat.getJobInfo(currTaskContext.getConfiguration());
-
-        //setup serDe
-        SerDe currSerDe = ReflectionUtils.newInstance(storageHandler.getSerDeClass(), currTaskContext.getJobConf());
-        try {
-          InternalUtil.initializeOutputSerDe(currSerDe, currTaskContext.getConfiguration(), localJobInfo);
-        } catch (SerDeException e) {
-          throw new IOException("Failed to initialize SerDe", e);
-        }
-
-        //create base OutputFormat
-        org.apache.hadoop.mapred.OutputFormat baseOF =
-          ReflectionUtils.newInstance(storageHandler.getOutputFormatClass(), currTaskContext.getJobConf());
-
-        //We are skipping calling checkOutputSpecs() for each partition
-        //As it can throw a FileAlreadyExistsException when more than one mapper is writing to a partition
-        //See HCATALOG-490, also to avoid contacting the namenode for each new FileOutputFormat instance
-        //In general this should be ok for most FileOutputFormat implementations
-        //but may become an issue for cases when the method is used to perform other setup tasks
-
-        //get Output Committer
-        org.apache.hadoop.mapred.OutputCommitter baseOutputCommitter = currTaskContext.getJobConf().getOutputCommitter();
-        //create currJobContext the latest so it gets all the config changes
-        org.apache.hadoop.mapred.JobContext currJobContext = HCatMapRedUtil.createJobContext(currTaskContext);
-        //setupJob()
-        baseOutputCommitter.setupJob(currJobContext);
-        //recreate to refresh jobConf of currTask context
-        currTaskContext =
-          HCatMapRedUtil.createTaskAttemptContext(currJobContext.getJobConf(),
-            currTaskContext.getTaskAttemptID(),
-            currTaskContext.getProgressible());
-        //set temp location
-        currTaskContext.getConfiguration().set("mapred.work.output.dir",
-          new FileOutputCommitter(new Path(localJobInfo.getLocation()), currTaskContext).getWorkPath().toString());
-        //setupTask()
-        baseOutputCommitter.setupTask(currTaskContext);
-
-        Path parentDir = new Path(currTaskContext.getConfiguration().get("mapred.work.output.dir"));
-        Path childPath = new Path(parentDir,FileOutputFormat.getUniqueFile(currTaskContext, "part", ""));
-
-        org.apache.hadoop.mapred.RecordWriter baseRecordWriter =
-          baseOF.getRecordWriter(
-            parentDir.getFileSystem(currTaskContext.getConfiguration()),
-            currTaskContext.getJobConf(),
-            childPath.toString(),
-            InternalUtil.createReporter(currTaskContext));
-
-        baseDynamicWriters.put(dynKey, baseRecordWriter);
-        baseDynamicSerDe.put(dynKey, currSerDe);
-        baseDynamicCommitters.put(dynKey, baseOutputCommitter);
-        dynamicContexts.put(dynKey, currTaskContext);
-        dynamicObjectInspectors.put(dynKey, InternalUtil.createStructObjectInspector(jobInfo.getOutputSchema()));
-        dynamicOutputJobInfo.put(dynKey, HCatOutputFormat.getJobInfo(dynamicContexts.get(dynKey).getConfiguration()));
-      }
-
-      localJobInfo = dynamicOutputJobInfo.get(dynKey);
-      localWriter = baseDynamicWriters.get(dynKey);
-      localSerDe = baseDynamicSerDe.get(dynKey);
-      localObjectInspector = dynamicObjectInspectors.get(dynKey);
-    } else {
-      localJobInfo = jobInfo;
-      localWriter = getBaseRecordWriter();
-      localSerDe = serDe;
-      localObjectInspector = objectInspector;
-    }
+      InterruptedException {
+    LocalFileWriter localFileWriter = getLocalFileWriter(value);
+    RecordWriter localWriter = localFileWriter.getLocalWriter();
+    ObjectInspector localObjectInspector = localFileWriter.getLocalObjectInspector();
+    SerDe localSerDe = localFileWriter.getLocalSerDe();
+    OutputJobInfo localJobInfo = localFileWriter.getLocalJobInfo();
 
     for (Integer colToDel : partColsToDel) {
       value.remove(colToDel);
     }
 
-
-    //The key given by user is ignored
+    // The key given by user is ignored
     try {
-      localWriter.write(NullWritable.get(), localSerDe.serialize(value.getAll(), localObjectInspector));
+      localWriter.write(NullWritable.get(),
+          localSerDe.serialize(value.getAll(), localObjectInspector));
     } catch (SerDeException e) {
       throw new IOException("Failed to serialize object", e);
     }
   }
 
-  protected void configureDynamicStorageHandler(JobContext context, List<String> dynamicPartVals) throws IOException {
-    HCatOutputFormat.configureOutputStorageHandler(context, dynamicPartVals);
-  }
+  class LocalFileWriter {
+    private RecordWriter localWriter;
+    private ObjectInspector localObjectInspector;
+    private SerDe localSerDe;
+    private OutputJobInfo localJobInfo;
 
+    public LocalFileWriter(RecordWriter localWriter, ObjectInspector localObjectInspector,
+        SerDe localSerDe, OutputJobInfo localJobInfo) {
+      this.localWriter = localWriter;
+      this.localObjectInspector = localObjectInspector;
+      this.localSerDe = localSerDe;
+      this.localJobInfo = localJobInfo;
+    }
+
+    public RecordWriter getLocalWriter() {
+      return localWriter;
+    }
+
+    public ObjectInspector getLocalObjectInspector() {
+      return localObjectInspector;
+    }
+
+    public SerDe getLocalSerDe() {
+      return localSerDe;
+    }
+
+    public OutputJobInfo getLocalJobInfo() {
+      return localJobInfo;
+    }
+  }
 }

Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/StaticPartitionFileRecordWriterContainer.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/StaticPartitionFileRecordWriterContainer.java?rev=1604731&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/StaticPartitionFileRecordWriterContainer.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/StaticPartitionFileRecordWriterContainer.java Mon Jun 23 10:17:29 2014
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hive.hcatalog.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hive.hcatalog.mapreduce.FileRecordWriterContainer;
+import org.apache.hive.hcatalog.common.HCatException;
+import org.apache.hive.hcatalog.data.HCatRecord;
+
+/**
+ * Record writer container for tables using static partitioning. See
+ * {@link FileOutputFormatContainer} for more information
+ */
+class StaticPartitionFileRecordWriterContainer extends FileRecordWriterContainer {
+  /**
+   * @param baseWriter RecordWriter to contain
+   * @param context current TaskAttemptContext
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public StaticPartitionFileRecordWriterContainer(
+      RecordWriter<? super WritableComparable<?>, ? super Writable> baseWriter,
+      TaskAttemptContext context) throws IOException, InterruptedException {
+    super(baseWriter, context);
+  }
+
+  @Override
+  public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+    Reporter reporter = InternalUtil.createReporter(context);
+    getBaseRecordWriter().close(reporter);
+  }
+
+  @Override
+  protected LocalFileWriter getLocalFileWriter(HCatRecord value) throws IOException, HCatException {
+    return new LocalFileWriter(getBaseRecordWriter(), objectInspector, serDe, jobInfo);
+  }
+}