You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by cw...@apache.org on 2014/06/23 12:17:29 UTC
svn commit: r1604731 -
/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/
Author: cws
Date: Mon Jun 23 10:17:29 2014
New Revision: 1604731
URL: http://svn.apache.org/r1604731
Log:
HIVE-7094: Separate out static/dynamic partitioning code in FileRecordWriterContainer (David Chen via cws)
Added:
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicPartitionFileRecordWriterContainer.java
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/StaticPartitionFileRecordWriterContainer.java
Modified:
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputFormatContainer.java
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileRecordWriterContainer.java
Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicPartitionFileRecordWriterContainer.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicPartitionFileRecordWriterContainer.java?rev=1604731&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicPartitionFileRecordWriterContainer.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicPartitionFileRecordWriterContainer.java Mon Jun 23 10:17:29 2014
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hive.hcatalog.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hive.hcatalog.common.ErrorType;
+import org.apache.hive.hcatalog.common.HCatException;
+import org.apache.hive.hcatalog.common.HCatUtil;
+import org.apache.hive.hcatalog.data.HCatRecord;
+
+/**
+ * Record writer container for tables using dynamic partitioning. See
+ * {@link FileOutputFormatContainer} for more information
+ */
+class DynamicPartitionFileRecordWriterContainer extends FileRecordWriterContainer {
+ private final List<Integer> dynamicPartCols;
+ private int maxDynamicPartitions;
+
+ private final Map<String, RecordWriter<? super WritableComparable<?>, ? super Writable>> baseDynamicWriters;
+ private final Map<String, SerDe> baseDynamicSerDe;
+ private final Map<String, org.apache.hadoop.mapred.OutputCommitter> baseDynamicCommitters;
+ private final Map<String, org.apache.hadoop.mapred.TaskAttemptContext> dynamicContexts;
+ private final Map<String, ObjectInspector> dynamicObjectInspectors;
+ private Map<String, OutputJobInfo> dynamicOutputJobInfo;
+
+ /**
+ * @param baseWriter RecordWriter to contain
+ * @param context current TaskAttemptContext
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public DynamicPartitionFileRecordWriterContainer(
+ RecordWriter<? super WritableComparable<?>, ? super Writable> baseWriter,
+ TaskAttemptContext context) throws IOException, InterruptedException {
+ super(baseWriter, context);
+ maxDynamicPartitions = jobInfo.getMaxDynamicPartitions();
+ dynamicPartCols = jobInfo.getPosOfDynPartCols();
+ if (dynamicPartCols == null) {
+ throw new HCatException("It seems that setSchema() is not called on "
+ + "HCatOutputFormat. Please make sure that method is called.");
+ }
+
+ this.baseDynamicSerDe = new HashMap<String, SerDe>();
+ this.baseDynamicWriters =
+ new HashMap<String, RecordWriter<? super WritableComparable<?>, ? super Writable>>();
+ this.baseDynamicCommitters = new HashMap<String, org.apache.hadoop.mapred.OutputCommitter>();
+ this.dynamicContexts = new HashMap<String, org.apache.hadoop.mapred.TaskAttemptContext>();
+ this.dynamicObjectInspectors = new HashMap<String, ObjectInspector>();
+ this.dynamicOutputJobInfo = new HashMap<String, OutputJobInfo>();
+ }
+
+ @Override
+ public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+ Reporter reporter = InternalUtil.createReporter(context);
+ for (RecordWriter<? super WritableComparable<?>, ? super Writable> bwriter : baseDynamicWriters
+ .values()) {
+ // We are in RecordWriter.close() make sense that the context would be
+ // TaskInputOutput.
+ bwriter.close(reporter);
+ }
+ for (Map.Entry<String, org.apache.hadoop.mapred.OutputCommitter> entry : baseDynamicCommitters
+ .entrySet()) {
+ org.apache.hadoop.mapred.TaskAttemptContext currContext = dynamicContexts.get(entry.getKey());
+ OutputCommitter baseOutputCommitter = entry.getValue();
+ if (baseOutputCommitter.needsTaskCommit(currContext)) {
+ baseOutputCommitter.commitTask(currContext);
+ }
+ }
+ }
+
+ @Override
+ protected LocalFileWriter getLocalFileWriter(HCatRecord value) throws IOException, HCatException {
+ OutputJobInfo localJobInfo = null;
+ // Calculate which writer to use from the remaining values - this needs to
+ // be done before we delete cols.
+ List<String> dynamicPartValues = new ArrayList<String>();
+ for (Integer colToAppend : dynamicPartCols) {
+ dynamicPartValues.add(value.get(colToAppend).toString());
+ }
+
+ String dynKey = dynamicPartValues.toString();
+ if (!baseDynamicWriters.containsKey(dynKey)) {
+ if ((maxDynamicPartitions != -1) && (baseDynamicWriters.size() > maxDynamicPartitions)) {
+ throw new HCatException(ErrorType.ERROR_TOO_MANY_DYNAMIC_PTNS,
+ "Number of dynamic partitions being created "
+ + "exceeds configured max allowable partitions[" + maxDynamicPartitions
+ + "], increase parameter [" + HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.varname
+ + "] if needed.");
+ }
+
+ org.apache.hadoop.mapred.TaskAttemptContext currTaskContext =
+ HCatMapRedUtil.createTaskAttemptContext(context);
+ configureDynamicStorageHandler(currTaskContext, dynamicPartValues);
+ localJobInfo = HCatBaseOutputFormat.getJobInfo(currTaskContext.getConfiguration());
+
+ // Setup serDe.
+ SerDe currSerDe =
+ ReflectionUtils.newInstance(storageHandler.getSerDeClass(), currTaskContext.getJobConf());
+ try {
+ InternalUtil.initializeOutputSerDe(currSerDe, currTaskContext.getConfiguration(),
+ localJobInfo);
+ } catch (SerDeException e) {
+ throw new IOException("Failed to initialize SerDe", e);
+ }
+
+ // create base OutputFormat
+ org.apache.hadoop.mapred.OutputFormat baseOF =
+ ReflectionUtils.newInstance(storageHandler.getOutputFormatClass(),
+ currTaskContext.getJobConf());
+
+ // We are skipping calling checkOutputSpecs() for each partition
+ // As it can throw a FileAlreadyExistsException when more than one
+ // mapper is writing to a partition.
+ // See HCATALOG-490, also to avoid contacting the namenode for each new
+ // FileOutputFormat instance.
+ // In general this should be ok for most FileOutputFormat implementations
+ // but may become an issue for cases when the method is used to perform
+ // other setup tasks.
+
+ // Get Output Committer
+ org.apache.hadoop.mapred.OutputCommitter baseOutputCommitter =
+ currTaskContext.getJobConf().getOutputCommitter();
+
+ // Create currJobContext the latest so it gets all the config changes
+ org.apache.hadoop.mapred.JobContext currJobContext =
+ HCatMapRedUtil.createJobContext(currTaskContext);
+
+ // Set up job.
+ baseOutputCommitter.setupJob(currJobContext);
+
+ // Recreate to refresh jobConf of currTask context.
+ currTaskContext =
+ HCatMapRedUtil.createTaskAttemptContext(currJobContext.getJobConf(),
+ currTaskContext.getTaskAttemptID(), currTaskContext.getProgressible());
+
+ // Set temp location.
+ currTaskContext.getConfiguration().set(
+ "mapred.work.output.dir",
+ new FileOutputCommitter(new Path(localJobInfo.getLocation()), currTaskContext)
+ .getWorkPath().toString());
+
+ // Set up task.
+ baseOutputCommitter.setupTask(currTaskContext);
+
+ Path parentDir = new Path(currTaskContext.getConfiguration().get("mapred.work.output.dir"));
+ Path childPath =
+ new Path(parentDir, FileOutputFormat.getUniqueFile(currTaskContext, "part", ""));
+
+ RecordWriter baseRecordWriter =
+ baseOF.getRecordWriter(parentDir.getFileSystem(currTaskContext.getConfiguration()),
+ currTaskContext.getJobConf(), childPath.toString(),
+ InternalUtil.createReporter(currTaskContext));
+
+ baseDynamicWriters.put(dynKey, baseRecordWriter);
+ baseDynamicSerDe.put(dynKey, currSerDe);
+ baseDynamicCommitters.put(dynKey, baseOutputCommitter);
+ dynamicContexts.put(dynKey, currTaskContext);
+ dynamicObjectInspectors.put(dynKey,
+ InternalUtil.createStructObjectInspector(jobInfo.getOutputSchema()));
+ dynamicOutputJobInfo.put(dynKey,
+ HCatOutputFormat.getJobInfo(dynamicContexts.get(dynKey).getConfiguration()));
+ }
+
+ return new LocalFileWriter(baseDynamicWriters.get(dynKey), dynamicObjectInspectors.get(dynKey),
+ baseDynamicSerDe.get(dynKey), dynamicOutputJobInfo.get(dynKey));
+ }
+
+ protected void configureDynamicStorageHandler(JobContext context, List<String> dynamicPartVals)
+ throws IOException {
+ HCatOutputFormat.configureOutputStorageHandler(context, dynamicPartVals);
+ }
+}
Modified: hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputFormatContainer.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputFormatContainer.java?rev=1604731&r1=1604730&r2=1604731&view=diff
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputFormatContainer.java (original)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputFormatContainer.java Mon Jun 23 10:17:29 2014
@@ -95,18 +95,19 @@ class FileOutputFormatContainer extends
// When Dynamic partitioning is used, the RecordWriter instance initialized here isn't used. Can use null.
// (That's because records can't be written until the values of the dynamic partitions are deduced.
// By that time, a new local instance of RecordWriter, with the correct output-path, will be constructed.)
- rw = new FileRecordWriterContainer((org.apache.hadoop.mapred.RecordWriter)null,context);
+ rw = new DynamicPartitionFileRecordWriterContainer(
+ (org.apache.hadoop.mapred.RecordWriter)null, context);
} else {
Path parentDir = new Path(context.getConfiguration().get("mapred.work.output.dir"));
Path childPath = new Path(parentDir,FileOutputFormat.getUniqueName(new JobConf(context.getConfiguration()), "part"));
- rw = new FileRecordWriterContainer(
- getBaseOutputFormat().getRecordWriter(
- parentDir.getFileSystem(context.getConfiguration()),
- new JobConf(context.getConfiguration()),
- childPath.toString(),
- InternalUtil.createReporter(context)),
- context);
+ rw = new StaticPartitionFileRecordWriterContainer(
+ getBaseOutputFormat().getRecordWriter(
+ parentDir.getFileSystem(context.getConfiguration()),
+ new JobConf(context.getConfiguration()),
+ childPath.toString(),
+ InternalUtil.createReporter(context)),
+ context);
}
return rw;
}
Modified: hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileRecordWriterContainer.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileRecordWriterContainer.java?rev=1604731&r1=1604730&r2=1604731&view=diff
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileRecordWriterContainer.java (original)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileRecordWriterContainer.java Mon Jun 23 10:17:29 2014
@@ -35,6 +35,7 @@ import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -47,31 +48,19 @@ import org.apache.hive.hcatalog.common.H
import org.apache.hive.hcatalog.data.HCatRecord;
/**
- * Part of the FileOutput*Container classes
- * See {@link FileOutputFormatContainer} for more information
+ * Part of the FileOutput*Container classes See {@link FileOutputFormatContainer} for more
+ * information
*/
-class FileRecordWriterContainer extends RecordWriterContainer {
-
- private final HiveStorageHandler storageHandler;
- private final SerDe serDe;
- private final ObjectInspector objectInspector;
-
- private boolean dynamicPartitioningUsed = false;
-
- private final Map<String, org.apache.hadoop.mapred.RecordWriter<? super WritableComparable<?>, ? super Writable>> baseDynamicWriters;
- private final Map<String, SerDe> baseDynamicSerDe;
- private final Map<String, org.apache.hadoop.mapred.OutputCommitter> baseDynamicCommitters;
- private final Map<String, org.apache.hadoop.mapred.TaskAttemptContext> dynamicContexts;
- private final Map<String, ObjectInspector> dynamicObjectInspectors;
- private Map<String, OutputJobInfo> dynamicOutputJobInfo;
+abstract class FileRecordWriterContainer extends RecordWriterContainer {
+ protected final HiveStorageHandler storageHandler;
+ protected final SerDe serDe;
+ protected final ObjectInspector objectInspector;
private final List<Integer> partColsToDel;
- private final List<Integer> dynamicPartCols;
- private int maxDynamicPartitions;
- private OutputJobInfo jobInfo;
- private TaskAttemptContext context;
+ protected OutputJobInfo jobInfo;
+ protected TaskAttemptContext context;
/**
* @param baseWriter RecordWriter to contain
@@ -79,13 +68,16 @@ class FileRecordWriterContainer extends
* @throws IOException
* @throws InterruptedException
*/
- public FileRecordWriterContainer(org.apache.hadoop.mapred.RecordWriter<? super WritableComparable<?>, ? super Writable> baseWriter,
- TaskAttemptContext context) throws IOException, InterruptedException {
+ public FileRecordWriterContainer(
+ RecordWriter<? super WritableComparable<?>, ? super Writable> baseWriter,
+ TaskAttemptContext context) throws IOException, InterruptedException {
super(context, baseWriter);
this.context = context;
jobInfo = HCatOutputFormat.getJobInfo(context.getConfiguration());
- storageHandler = HCatUtil.getStorageHandler(context.getConfiguration(), jobInfo.getTableInfo().getStorerInfo());
+ storageHandler =
+ HCatUtil.getStorageHandler(context.getConfiguration(), jobInfo.getTableInfo()
+ .getStorerInfo());
serDe = ReflectionUtils.newInstance(storageHandler.getSerDeClass(), context.getConfiguration());
objectInspector = InternalUtil.createStructObjectInspector(jobInfo.getOutputSchema());
try {
@@ -96,30 +88,9 @@ class FileRecordWriterContainer extends
// If partition columns occur in data, we want to remove them.
partColsToDel = jobInfo.getPosOfPartCols();
- dynamicPartitioningUsed = jobInfo.isDynamicPartitioningUsed();
- dynamicPartCols = jobInfo.getPosOfDynPartCols();
- maxDynamicPartitions = jobInfo.getMaxDynamicPartitions();
-
- if ((partColsToDel == null) || (dynamicPartitioningUsed && (dynamicPartCols == null))) {
- throw new HCatException("It seems that setSchema() is not called on " +
- "HCatOutputFormat. Please make sure that method is called.");
- }
-
-
- if (!dynamicPartitioningUsed) {
- this.baseDynamicSerDe = null;
- this.baseDynamicWriters = null;
- this.baseDynamicCommitters = null;
- this.dynamicContexts = null;
- this.dynamicObjectInspectors = null;
- this.dynamicOutputJobInfo = null;
- } else {
- this.baseDynamicSerDe = new HashMap<String, SerDe>();
- this.baseDynamicWriters = new HashMap<String, org.apache.hadoop.mapred.RecordWriter<? super WritableComparable<?>, ? super Writable>>();
- this.baseDynamicCommitters = new HashMap<String, org.apache.hadoop.mapred.OutputCommitter>();
- this.dynamicContexts = new HashMap<String, org.apache.hadoop.mapred.TaskAttemptContext>();
- this.dynamicObjectInspectors = new HashMap<String, ObjectInspector>();
- this.dynamicOutputJobInfo = new HashMap<String, OutputJobInfo>();
+ if (partColsToDel == null) {
+ throw new HCatException("It seems that setSchema() is not called on "
+ + "HCatOutputFormat. Please make sure that method is called.");
}
}
@@ -130,138 +101,59 @@ class FileRecordWriterContainer extends
return storageHandler;
}
- @Override
- public void close(TaskAttemptContext context) throws IOException,
- InterruptedException {
- Reporter reporter = InternalUtil.createReporter(context);
- if (dynamicPartitioningUsed) {
- for (org.apache.hadoop.mapred.RecordWriter<? super WritableComparable<?>, ? super Writable> bwriter : baseDynamicWriters.values()) {
- //We are in RecordWriter.close() make sense that the context would be TaskInputOutput
- bwriter.close(reporter);
- }
- for (Map.Entry<String, org.apache.hadoop.mapred.OutputCommitter> entry : baseDynamicCommitters.entrySet()) {
- org.apache.hadoop.mapred.TaskAttemptContext currContext = dynamicContexts.get(entry.getKey());
- OutputCommitter baseOutputCommitter = entry.getValue();
- if (baseOutputCommitter.needsTaskCommit(currContext)) {
- baseOutputCommitter.commitTask(currContext);
- }
- }
- } else {
- getBaseRecordWriter().close(reporter);
- }
- }
+ abstract protected LocalFileWriter getLocalFileWriter(HCatRecord value) throws IOException,
+ HCatException;
@Override
public void write(WritableComparable<?> key, HCatRecord value) throws IOException,
- InterruptedException {
-
- org.apache.hadoop.mapred.RecordWriter localWriter;
- ObjectInspector localObjectInspector;
- SerDe localSerDe;
- OutputJobInfo localJobInfo = null;
-
- if (dynamicPartitioningUsed) {
- // calculate which writer to use from the remaining values - this needs to be done before we delete cols
- List<String> dynamicPartValues = new ArrayList<String>();
- for (Integer colToAppend : dynamicPartCols) {
- dynamicPartValues.add(value.get(colToAppend).toString());
- }
-
- String dynKey = dynamicPartValues.toString();
- if (!baseDynamicWriters.containsKey(dynKey)) {
- if ((maxDynamicPartitions != -1) && (baseDynamicWriters.size() > maxDynamicPartitions)) {
- throw new HCatException(ErrorType.ERROR_TOO_MANY_DYNAMIC_PTNS,
- "Number of dynamic partitions being created "
- + "exceeds configured max allowable partitions["
- + maxDynamicPartitions
- + "], increase parameter ["
- + HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.varname
- + "] if needed.");
- }
-
- org.apache.hadoop.mapred.TaskAttemptContext currTaskContext = HCatMapRedUtil.createTaskAttemptContext(context);
- configureDynamicStorageHandler(currTaskContext, dynamicPartValues);
- localJobInfo = HCatBaseOutputFormat.getJobInfo(currTaskContext.getConfiguration());
-
- //setup serDe
- SerDe currSerDe = ReflectionUtils.newInstance(storageHandler.getSerDeClass(), currTaskContext.getJobConf());
- try {
- InternalUtil.initializeOutputSerDe(currSerDe, currTaskContext.getConfiguration(), localJobInfo);
- } catch (SerDeException e) {
- throw new IOException("Failed to initialize SerDe", e);
- }
-
- //create base OutputFormat
- org.apache.hadoop.mapred.OutputFormat baseOF =
- ReflectionUtils.newInstance(storageHandler.getOutputFormatClass(), currTaskContext.getJobConf());
-
- //We are skipping calling checkOutputSpecs() for each partition
- //As it can throw a FileAlreadyExistsException when more than one mapper is writing to a partition
- //See HCATALOG-490, also to avoid contacting the namenode for each new FileOutputFormat instance
- //In general this should be ok for most FileOutputFormat implementations
- //but may become an issue for cases when the method is used to perform other setup tasks
-
- //get Output Committer
- org.apache.hadoop.mapred.OutputCommitter baseOutputCommitter = currTaskContext.getJobConf().getOutputCommitter();
- //create currJobContext the latest so it gets all the config changes
- org.apache.hadoop.mapred.JobContext currJobContext = HCatMapRedUtil.createJobContext(currTaskContext);
- //setupJob()
- baseOutputCommitter.setupJob(currJobContext);
- //recreate to refresh jobConf of currTask context
- currTaskContext =
- HCatMapRedUtil.createTaskAttemptContext(currJobContext.getJobConf(),
- currTaskContext.getTaskAttemptID(),
- currTaskContext.getProgressible());
- //set temp location
- currTaskContext.getConfiguration().set("mapred.work.output.dir",
- new FileOutputCommitter(new Path(localJobInfo.getLocation()), currTaskContext).getWorkPath().toString());
- //setupTask()
- baseOutputCommitter.setupTask(currTaskContext);
-
- Path parentDir = new Path(currTaskContext.getConfiguration().get("mapred.work.output.dir"));
- Path childPath = new Path(parentDir,FileOutputFormat.getUniqueFile(currTaskContext, "part", ""));
-
- org.apache.hadoop.mapred.RecordWriter baseRecordWriter =
- baseOF.getRecordWriter(
- parentDir.getFileSystem(currTaskContext.getConfiguration()),
- currTaskContext.getJobConf(),
- childPath.toString(),
- InternalUtil.createReporter(currTaskContext));
-
- baseDynamicWriters.put(dynKey, baseRecordWriter);
- baseDynamicSerDe.put(dynKey, currSerDe);
- baseDynamicCommitters.put(dynKey, baseOutputCommitter);
- dynamicContexts.put(dynKey, currTaskContext);
- dynamicObjectInspectors.put(dynKey, InternalUtil.createStructObjectInspector(jobInfo.getOutputSchema()));
- dynamicOutputJobInfo.put(dynKey, HCatOutputFormat.getJobInfo(dynamicContexts.get(dynKey).getConfiguration()));
- }
-
- localJobInfo = dynamicOutputJobInfo.get(dynKey);
- localWriter = baseDynamicWriters.get(dynKey);
- localSerDe = baseDynamicSerDe.get(dynKey);
- localObjectInspector = dynamicObjectInspectors.get(dynKey);
- } else {
- localJobInfo = jobInfo;
- localWriter = getBaseRecordWriter();
- localSerDe = serDe;
- localObjectInspector = objectInspector;
- }
+ InterruptedException {
+ LocalFileWriter localFileWriter = getLocalFileWriter(value);
+ RecordWriter localWriter = localFileWriter.getLocalWriter();
+ ObjectInspector localObjectInspector = localFileWriter.getLocalObjectInspector();
+ SerDe localSerDe = localFileWriter.getLocalSerDe();
+ OutputJobInfo localJobInfo = localFileWriter.getLocalJobInfo();
for (Integer colToDel : partColsToDel) {
value.remove(colToDel);
}
-
- //The key given by user is ignored
+ // The key given by user is ignored
try {
- localWriter.write(NullWritable.get(), localSerDe.serialize(value.getAll(), localObjectInspector));
+ localWriter.write(NullWritable.get(),
+ localSerDe.serialize(value.getAll(), localObjectInspector));
} catch (SerDeException e) {
throw new IOException("Failed to serialize object", e);
}
}
- protected void configureDynamicStorageHandler(JobContext context, List<String> dynamicPartVals) throws IOException {
- HCatOutputFormat.configureOutputStorageHandler(context, dynamicPartVals);
- }
+ class LocalFileWriter {
+ private RecordWriter localWriter;
+ private ObjectInspector localObjectInspector;
+ private SerDe localSerDe;
+ private OutputJobInfo localJobInfo;
+ public LocalFileWriter(RecordWriter localWriter, ObjectInspector localObjectInspector,
+ SerDe localSerDe, OutputJobInfo localJobInfo) {
+ this.localWriter = localWriter;
+ this.localObjectInspector = localObjectInspector;
+ this.localSerDe = localSerDe;
+ this.localJobInfo = localJobInfo;
+ }
+
+ public RecordWriter getLocalWriter() {
+ return localWriter;
+ }
+
+ public ObjectInspector getLocalObjectInspector() {
+ return localObjectInspector;
+ }
+
+ public SerDe getLocalSerDe() {
+ return localSerDe;
+ }
+
+ public OutputJobInfo getLocalJobInfo() {
+ return localJobInfo;
+ }
+ }
}
Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/StaticPartitionFileRecordWriterContainer.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/StaticPartitionFileRecordWriterContainer.java?rev=1604731&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/StaticPartitionFileRecordWriterContainer.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/StaticPartitionFileRecordWriterContainer.java Mon Jun 23 10:17:29 2014
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hive.hcatalog.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hive.hcatalog.mapreduce.FileRecordWriterContainer;
+import org.apache.hive.hcatalog.common.HCatException;
+import org.apache.hive.hcatalog.data.HCatRecord;
+
+/**
+ * Record writer container for tables using static partitioning. See
+ * {@link FileOutputFormatContainer} for more information
+ */
+class StaticPartitionFileRecordWriterContainer extends FileRecordWriterContainer {
+ /**
+ * @param baseWriter RecordWriter to contain
+ * @param context current TaskAttemptContext
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public StaticPartitionFileRecordWriterContainer(
+ RecordWriter<? super WritableComparable<?>, ? super Writable> baseWriter,
+ TaskAttemptContext context) throws IOException, InterruptedException {
+ super(baseWriter, context);
+ }
+
+ @Override
+ public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+ Reporter reporter = InternalUtil.createReporter(context);
+ getBaseRecordWriter().close(reporter);
+ }
+
+ @Override
+ protected LocalFileWriter getLocalFileWriter(HCatRecord value) throws IOException, HCatException {
+ return new LocalFileWriter(getBaseRecordWriter(), objectInspector, serDe, jobInfo);
+ }
+}