You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kh...@apache.org on 2014/11/03 00:49:24 UTC

svn commit: r1636213 - in /hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce: DynamicPartitionFileRecordWriterContainer.java FileOutputCommitterContainer.java TaskCommitContextRegistry.java

Author: khorgath
Date: Sun Nov  2 23:49:23 2014
New Revision: 1636213

URL: http://svn.apache.org/r1636213
Log:
HIVE-8394 : HIVE-7803 doesn't handle Pig MultiQuery, can cause data-loss. (Mithun Radhakrishnan via Sushanth Sowmyan)

Added:
    hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/TaskCommitContextRegistry.java
Modified:
    hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicPartitionFileRecordWriterContainer.java
    hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java

Modified: hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicPartitionFileRecordWriterContainer.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicPartitionFileRecordWriterContainer.java?rev=1636213&r1=1636212&r2=1636213&view=diff
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicPartitionFileRecordWriterContainer.java (original)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicPartitionFileRecordWriterContainer.java Sun Nov  2 23:49:23 2014
@@ -27,11 +27,9 @@ import java.util.Map;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
 import org.apache.hadoop.hive.serde2.SerDe;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.Reporter;
@@ -44,14 +42,16 @@ import org.apache.hadoop.mapreduce.lib.o
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hive.hcatalog.common.ErrorType;
 import org.apache.hive.hcatalog.common.HCatException;
-import org.apache.hive.hcatalog.common.HCatUtil;
 import org.apache.hive.hcatalog.data.HCatRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Record writer container for tables using dynamic partitioning. See
  * {@link FileOutputFormatContainer} for more information
  */
 class DynamicPartitionFileRecordWriterContainer extends FileRecordWriterContainer {
+  private static final Logger LOG = LoggerFactory.getLogger(DynamicPartitionFileRecordWriterContainer.class);
   private final List<Integer> dynamicPartCols;
   private int maxDynamicPartitions;
 
@@ -97,14 +97,36 @@ class DynamicPartitionFileRecordWriterCo
       // TaskInputOutput.
       bwriter.close(reporter);
     }
-    for (Map.Entry<String, org.apache.hadoop.mapred.OutputCommitter> entry : baseDynamicCommitters
-        .entrySet()) {
-      org.apache.hadoop.mapred.TaskAttemptContext currContext = dynamicContexts.get(entry.getKey());
-      OutputCommitter baseOutputCommitter = entry.getValue();
-      if (baseOutputCommitter.needsTaskCommit(currContext)) {
-        baseOutputCommitter.commitTask(currContext);
+
+    TaskCommitContextRegistry.getInstance().register(context, new TaskCommitContextRegistry.TaskCommitterProxy() {
+      @Override
+      public void abortTask(TaskAttemptContext context) throws IOException {
+        for (Map.Entry<String, OutputJobInfo> outputJobInfoEntry : dynamicOutputJobInfo.entrySet()) {
+          String dynKey = outputJobInfoEntry.getKey();
+          OutputJobInfo outputJobInfo = outputJobInfoEntry.getValue();
+          LOG.info("Aborting task-attempt for " + outputJobInfo.getLocation());
+          baseDynamicCommitters.get(dynKey)
+                               .abortTask(dynamicContexts.get(dynKey));
+        }
       }
-    }
+
+      @Override
+      public void commitTask(TaskAttemptContext context) throws IOException {
+        for (Map.Entry<String, OutputJobInfo> outputJobInfoEntry : dynamicOutputJobInfo.entrySet()) {
+          String dynKey = outputJobInfoEntry.getKey();
+          OutputJobInfo outputJobInfo = outputJobInfoEntry.getValue();
+          LOG.info("Committing task-attempt for " + outputJobInfo.getLocation());
+          TaskAttemptContext dynContext = dynamicContexts.get(dynKey);
+          OutputCommitter dynCommitter = baseDynamicCommitters.get(dynKey);
+          if (dynCommitter.needsTaskCommit(dynContext)) {
+            dynCommitter.commitTask(dynContext);
+          }
+          else {
+            LOG.info("Skipping commitTask() for " + outputJobInfo.getLocation());
+          }
+        }
+      }
+    });
   }
 
   @Override

Modified: hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java?rev=1636213&r1=1636212&r2=1636213&view=diff
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java (original)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java Sun Nov  2 23:49:23 2014
@@ -118,6 +118,13 @@ class FileOutputCommitterContainer exten
   public void abortTask(TaskAttemptContext context) throws IOException {
     if (!dynamicPartitioningUsed) {
       getBaseOutputCommitter().abortTask(HCatMapRedUtil.createTaskAttemptContext(context));
+    } else {
+      try {
+        TaskCommitContextRegistry.getInstance().abortTask(context);
+      }
+      finally {
+        TaskCommitContextRegistry.getInstance().discardCleanupFor(context);
+      }
     }
   }
 
@@ -127,6 +134,13 @@ class FileOutputCommitterContainer exten
          //See HCATALOG-499
       FileOutputFormatContainer.setWorkOutputPath(context);
       getBaseOutputCommitter().commitTask(HCatMapRedUtil.createTaskAttemptContext(context));
+    } else {
+      try {
+        TaskCommitContextRegistry.getInstance().commitTask(context);
+      }
+      finally {
+        TaskCommitContextRegistry.getInstance().discardCleanupFor(context);
+      }
     }
   }
 
@@ -136,7 +150,7 @@ class FileOutputCommitterContainer exten
       return getBaseOutputCommitter().needsTaskCommit(HCatMapRedUtil.createTaskAttemptContext(context));
     } else {
       // called explicitly through FileRecordWriterContainer.close() if dynamic - return false by default
-      return false;
+      return true;
     }
   }
 

Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/TaskCommitContextRegistry.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/TaskCommitContextRegistry.java?rev=1636213&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/TaskCommitContextRegistry.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/TaskCommitContextRegistry.java Sun Nov  2 23:49:23 2014
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hive.hcatalog.mapreduce;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hive.hcatalog.common.HCatConstants;
+import org.apache.hive.hcatalog.common.HCatUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+/**
+ * Singleton Registry to track the commit of TaskAttempts.
+ * Used to manage commits for Tasks that create dynamic-partitions.
+ */
+public class TaskCommitContextRegistry {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TaskCommitContextRegistry.class);
+
+  private static TaskCommitContextRegistry ourInstance = new TaskCommitContextRegistry();
+
+  /**
+   * Singleton instance getter.
+   */
+  public static TaskCommitContextRegistry getInstance() {
+    return ourInstance;
+  }
+
+  /**
+   * Implement this interface to register call-backs for committing TaskAttempts.
+   */
+  public static interface TaskCommitterProxy {
+
+    /**
+     * Call-back for Committer's abortTask().
+     */
+    public void abortTask(TaskAttemptContext context) throws IOException;
+
+    /**
+     * Call-back for Committer's abortTask().
+     */
+    public void commitTask(TaskAttemptContext context) throws IOException;
+  }
+
+  private HashMap<String, TaskCommitterProxy> taskCommitters
+                    = new HashMap<String, TaskCommitterProxy>();
+
+  /**
+   * Trigger commit for TaskAttempt, as specified by the TaskAttemptContext argument.
+   */
+  public synchronized void commitTask(TaskAttemptContext context) throws IOException {
+    String key = generateKey(context);
+    if (!taskCommitters.containsKey(key)) {
+      throw new IOException("No callback registered for TaskAttemptID:" + key);
+    }
+
+    try {
+      LOG.info("Committing TaskAttempt:" + key);
+      taskCommitters.get(key).commitTask(context);
+    }
+    catch (Throwable t) {
+      throw new IOException("Could not clean up TaskAttemptID:" + key, t);
+    }
+
+  }
+
+  private String generateKey(TaskAttemptContext context) throws IOException {
+    String jobInfoString = context.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO);
+    if (StringUtils.isBlank(jobInfoString)) { // Avoid the NPE.
+      throw new IOException("Could not retrieve OutputJobInfo for TaskAttempt " + context.getTaskAttemptID());
+    }
+    OutputJobInfo jobInfo = (OutputJobInfo) HCatUtil.deserialize(jobInfoString);
+    return context.getTaskAttemptID().toString() + "@" + jobInfo.getLocation();
+  }
+
+  /**
+   * Trigger abort for TaskAttempt, as specified by the TaskAttemptContext argument.
+   */
+  public synchronized void abortTask(TaskAttemptContext context) throws IOException {
+    String key = generateKey(context);
+    if (!taskCommitters.containsKey(key)) {
+      throw new IOException("No callback registered for TaskAttemptID:" + key);
+    }
+
+    try {
+      LOG.info("Aborting TaskAttempt:" + key);
+      taskCommitters.get(key).abortTask(context);
+    }
+    catch (Throwable t) {
+      throw new IOException("Could not clean up TaskAttemptID:" + key, t);
+    }
+  }
+
+  /**
+   * Method to register call-backs to control commits and aborts of TaskAttempts.
+   * @param context The TaskAttemptContext instance for the task-attempt, identifying the output.
+   * @param committer Instance of TaskCommitterProxy, to commit/abort a TaskAttempt.
+   * @throws java.io.IOException On failure.
+   */
+  public synchronized void register(TaskAttemptContext context, TaskCommitterProxy committer) throws IOException {
+    String key = generateKey(context);
+    LOG.info("Registering committer for TaskAttemptID:" + key);
+    if (taskCommitters.containsKey(key)) {
+      LOG.warn("Replacing previous committer:" + committer);
+    }
+    taskCommitters.put(key, committer);
+  }
+
+  /**
+   * Method to discard the committer call-backs for a specified TaskAttemptID.
+   * @param context The TaskAttemptContext instance for the task-attempt, identifying the output.
+   * @throws java.io.IOException On failure.
+   */
+  public synchronized void discardCleanupFor(TaskAttemptContext context) throws IOException {
+    String key = generateKey(context);
+    LOG.info("Discarding all cleanup for TaskAttemptID:" + key);
+    if (!taskCommitters.containsKey(key)) {
+      LOG.warn("No committer registered for TaskAttemptID:" + key);
+    }
+    else {
+      taskCommitters.remove(key);
+    }
+  }
+
+  // Hide constructor, for make benefit glorious Singleton.
+  private TaskCommitContextRegistry() {
+  }
+}