You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kh...@apache.org on 2014/11/03 00:49:38 UTC
svn commit: r1636214 - in
/hive/branches/branch-0.14/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce:
DynamicPartitionFileRecordWriterContainer.java
FileOutputCommitterContainer.java TaskCommitContextRegistry.java
Author: khorgath
Date: Sun Nov 2 23:49:37 2014
New Revision: 1636214
URL: http://svn.apache.org/r1636214
Log:
HIVE-8394 : HIVE-7803 doesn't handle Pig MultiQuery, can cause data-loss. (Mithun Radhakrishnan via Sushanth Sowmyan)
Added:
hive/branches/branch-0.14/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/TaskCommitContextRegistry.java
Modified:
hive/branches/branch-0.14/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicPartitionFileRecordWriterContainer.java
hive/branches/branch-0.14/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java
Modified: hive/branches/branch-0.14/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicPartitionFileRecordWriterContainer.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicPartitionFileRecordWriterContainer.java?rev=1636214&r1=1636213&r2=1636214&view=diff
==============================================================================
--- hive/branches/branch-0.14/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicPartitionFileRecordWriterContainer.java (original)
+++ hive/branches/branch-0.14/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicPartitionFileRecordWriterContainer.java Sun Nov 2 23:49:37 2014
@@ -27,11 +27,9 @@ import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
import org.apache.hadoop.hive.serde2.SerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.Reporter;
@@ -44,14 +42,16 @@ import org.apache.hadoop.mapreduce.lib.o
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hive.hcatalog.common.ErrorType;
import org.apache.hive.hcatalog.common.HCatException;
-import org.apache.hive.hcatalog.common.HCatUtil;
import org.apache.hive.hcatalog.data.HCatRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Record writer container for tables using dynamic partitioning. See
* {@link FileOutputFormatContainer} for more information
*/
class DynamicPartitionFileRecordWriterContainer extends FileRecordWriterContainer {
+ private static final Logger LOG = LoggerFactory.getLogger(DynamicPartitionFileRecordWriterContainer.class);
private final List<Integer> dynamicPartCols;
private int maxDynamicPartitions;
@@ -97,14 +97,36 @@ class DynamicPartitionFileRecordWriterCo
// TaskInputOutput.
bwriter.close(reporter);
}
- for (Map.Entry<String, org.apache.hadoop.mapred.OutputCommitter> entry : baseDynamicCommitters
- .entrySet()) {
- org.apache.hadoop.mapred.TaskAttemptContext currContext = dynamicContexts.get(entry.getKey());
- OutputCommitter baseOutputCommitter = entry.getValue();
- if (baseOutputCommitter.needsTaskCommit(currContext)) {
- baseOutputCommitter.commitTask(currContext);
+
+ TaskCommitContextRegistry.getInstance().register(context, new TaskCommitContextRegistry.TaskCommitterProxy() {
+ @Override
+ public void abortTask(TaskAttemptContext context) throws IOException {
+ for (Map.Entry<String, OutputJobInfo> outputJobInfoEntry : dynamicOutputJobInfo.entrySet()) {
+ String dynKey = outputJobInfoEntry.getKey();
+ OutputJobInfo outputJobInfo = outputJobInfoEntry.getValue();
+ LOG.info("Aborting task-attempt for " + outputJobInfo.getLocation());
+ baseDynamicCommitters.get(dynKey)
+ .abortTask(dynamicContexts.get(dynKey));
+ }
}
- }
+
+ @Override
+ public void commitTask(TaskAttemptContext context) throws IOException {
+ for (Map.Entry<String, OutputJobInfo> outputJobInfoEntry : dynamicOutputJobInfo.entrySet()) {
+ String dynKey = outputJobInfoEntry.getKey();
+ OutputJobInfo outputJobInfo = outputJobInfoEntry.getValue();
+ LOG.info("Committing task-attempt for " + outputJobInfo.getLocation());
+ TaskAttemptContext dynContext = dynamicContexts.get(dynKey);
+ OutputCommitter dynCommitter = baseDynamicCommitters.get(dynKey);
+ if (dynCommitter.needsTaskCommit(dynContext)) {
+ dynCommitter.commitTask(dynContext);
+ }
+ else {
+ LOG.info("Skipping commitTask() for " + outputJobInfo.getLocation());
+ }
+ }
+ }
+ });
}
@Override
Modified: hive/branches/branch-0.14/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java?rev=1636214&r1=1636213&r2=1636214&view=diff
==============================================================================
--- hive/branches/branch-0.14/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java (original)
+++ hive/branches/branch-0.14/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java Sun Nov 2 23:49:37 2014
@@ -118,6 +118,13 @@ class FileOutputCommitterContainer exten
public void abortTask(TaskAttemptContext context) throws IOException {
if (!dynamicPartitioningUsed) {
getBaseOutputCommitter().abortTask(HCatMapRedUtil.createTaskAttemptContext(context));
+ } else {
+ try {
+ TaskCommitContextRegistry.getInstance().abortTask(context);
+ }
+ finally {
+ TaskCommitContextRegistry.getInstance().discardCleanupFor(context);
+ }
}
}
@@ -127,6 +134,13 @@ class FileOutputCommitterContainer exten
//See HCATALOG-499
FileOutputFormatContainer.setWorkOutputPath(context);
getBaseOutputCommitter().commitTask(HCatMapRedUtil.createTaskAttemptContext(context));
+ } else {
+ try {
+ TaskCommitContextRegistry.getInstance().commitTask(context);
+ }
+ finally {
+ TaskCommitContextRegistry.getInstance().discardCleanupFor(context);
+ }
}
}
@@ -136,7 +150,7 @@ class FileOutputCommitterContainer exten
return getBaseOutputCommitter().needsTaskCommit(HCatMapRedUtil.createTaskAttemptContext(context));
} else {
// called explicitly through FileRecordWriterContainer.close() if dynamic - return false by default
- return false;
+ return true;
}
}
Added: hive/branches/branch-0.14/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/TaskCommitContextRegistry.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/TaskCommitContextRegistry.java?rev=1636214&view=auto
==============================================================================
--- hive/branches/branch-0.14/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/TaskCommitContextRegistry.java (added)
+++ hive/branches/branch-0.14/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/TaskCommitContextRegistry.java Sun Nov 2 23:49:37 2014
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hive.hcatalog.mapreduce;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hive.hcatalog.common.HCatConstants;
+import org.apache.hive.hcatalog.common.HCatUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+/**
+ * Singleton Registry to track the commit of TaskAttempts.
+ * Used to manage commits for Tasks that create dynamic-partitions.
+ */
+public class TaskCommitContextRegistry {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TaskCommitContextRegistry.class);
+
+ private static TaskCommitContextRegistry ourInstance = new TaskCommitContextRegistry();
+
+ /**
+ * Singleton instance getter.
+ */
+ public static TaskCommitContextRegistry getInstance() {
+ return ourInstance;
+ }
+
+ /**
+ * Implement this interface to register call-backs for committing TaskAttempts.
+ */
+ public static interface TaskCommitterProxy {
+
+ /**
+ * Call-back for Committer's abortTask().
+ */
+ public void abortTask(TaskAttemptContext context) throws IOException;
+
+ /**
+ * Call-back for Committer's abortTask().
+ */
+ public void commitTask(TaskAttemptContext context) throws IOException;
+ }
+
+ private HashMap<String, TaskCommitterProxy> taskCommitters
+ = new HashMap<String, TaskCommitterProxy>();
+
+ /**
+ * Trigger commit for TaskAttempt, as specified by the TaskAttemptContext argument.
+ */
+ public synchronized void commitTask(TaskAttemptContext context) throws IOException {
+ String key = generateKey(context);
+ if (!taskCommitters.containsKey(key)) {
+ throw new IOException("No callback registered for TaskAttemptID:" + key);
+ }
+
+ try {
+ LOG.info("Committing TaskAttempt:" + key);
+ taskCommitters.get(key).commitTask(context);
+ }
+ catch (Throwable t) {
+ throw new IOException("Could not clean up TaskAttemptID:" + key, t);
+ }
+
+ }
+
+ private String generateKey(TaskAttemptContext context) throws IOException {
+ String jobInfoString = context.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO);
+ if (StringUtils.isBlank(jobInfoString)) { // Avoid the NPE.
+ throw new IOException("Could not retrieve OutputJobInfo for TaskAttempt " + context.getTaskAttemptID());
+ }
+ OutputJobInfo jobInfo = (OutputJobInfo) HCatUtil.deserialize(jobInfoString);
+ return context.getTaskAttemptID().toString() + "@" + jobInfo.getLocation();
+ }
+
+ /**
+ * Trigger abort for TaskAttempt, as specified by the TaskAttemptContext argument.
+ */
+ public synchronized void abortTask(TaskAttemptContext context) throws IOException {
+ String key = generateKey(context);
+ if (!taskCommitters.containsKey(key)) {
+ throw new IOException("No callback registered for TaskAttemptID:" + key);
+ }
+
+ try {
+ LOG.info("Aborting TaskAttempt:" + key);
+ taskCommitters.get(key).abortTask(context);
+ }
+ catch (Throwable t) {
+ throw new IOException("Could not clean up TaskAttemptID:" + key, t);
+ }
+ }
+
+ /**
+ * Method to register call-backs to control commits and aborts of TaskAttempts.
+ * @param context The TaskAttemptContext instance for the task-attempt, identifying the output.
+ * @param committer Instance of TaskCommitterProxy, to commit/abort a TaskAttempt.
+ * @throws java.io.IOException On failure.
+ */
+ public synchronized void register(TaskAttemptContext context, TaskCommitterProxy committer) throws IOException {
+ String key = generateKey(context);
+ LOG.info("Registering committer for TaskAttemptID:" + key);
+ if (taskCommitters.containsKey(key)) {
+ LOG.warn("Replacing previous committer:" + committer);
+ }
+ taskCommitters.put(key, committer);
+ }
+
+ /**
+ * Method to discard the committer call-backs for a specified TaskAttemptID.
+ * @param context The TaskAttemptContext instance for the task-attempt, identifying the output.
+ * @throws java.io.IOException On failure.
+ */
+ public synchronized void discardCleanupFor(TaskAttemptContext context) throws IOException {
+ String key = generateKey(context);
+ LOG.info("Discarding all cleanup for TaskAttemptID:" + key);
+ if (!taskCommitters.containsKey(key)) {
+ LOG.warn("No committer registered for TaskAttemptID:" + key);
+ }
+ else {
+ taskCommitters.remove(key);
+ }
+ }
+
+ // Hide constructor, for make benefit glorious Singleton.
+ private TaskCommitContextRegistry() {
+ }
+}