You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by kh...@apache.org on 2011/07/22 23:38:14 UTC
svn commit: r1149763 [1/2] - in /incubator/hcatalog/trunk: ./
src/java/org/apache/hcatalog/common/ src/java/org/apache/hcatalog/har/
src/java/org/apache/hcatalog/mapreduce/ src/java/org/apache/hcatalog/pig/
src/java/org/apache/hcatalog/rcfile/ src/test...
Author: khorgath
Date: Fri Jul 22 23:38:07 2011
New Revision: 1149763
URL: http://svn.apache.org/viewvc?rev=1149763&view=rev
Log:
HCATALOG-42 Dynamic Partitioning
Added:
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/har/
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/har/HarOutputCommitterPostProcessor.java
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java
Modified:
incubator/hcatalog/trunk/build.xml
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/ErrorType.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatUtil.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputCommitter.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputCommitter.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputFormat.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputCommitter.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputStorageDriver.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordWriter.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatTableInfo.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/OutputJobInfo.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatEximStorer.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/PigHCatUtil.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/rcfile/RCFileMapReduceOutputFormat.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/rcfile/RCFileOutputDriver.java
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatEximInputFormat.java
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatEximOutputFormat.java
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatStorer.java
Modified: incubator/hcatalog/trunk/build.xml
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/build.xml?rev=1149763&r1=1149762&r2=1149763&view=diff
==============================================================================
--- incubator/hcatalog/trunk/build.xml (original)
+++ incubator/hcatalog/trunk/build.xml Fri Jul 22 23:38:07 2011
@@ -107,6 +107,7 @@
<fileset dir="${hive.root}/build/ivy/lib/default" includes="jdo2-api-2.3-ec.jar"/>
<fileset dir="${hive.root}/build/ivy/lib/default" includes="datanucleus-enhancer-2.0.3.jar"/>
<fileset dir="${hive.root}/build/ivy/lib/default" includes="datanucleus-core-2.0.3.jar"/>
+ <fileset dir="${lib.dir}" includes="hadoop_archive-0.3.1.jar"/>
<fileset dir="${hive.root}/lib" includes="asm-3.1.jar"/>
</path>
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/ErrorType.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/ErrorType.java?rev=1149763&r1=1149762&r2=1149763&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/ErrorType.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/ErrorType.java Fri Jul 22 23:38:07 2011
@@ -41,7 +41,7 @@ public enum ErrorType {
ERROR_INVALID_PARTITION_VALUES (2010, "Invalid partition values specified"),
ERROR_MISSING_PARTITION_KEY (2011, "Partition key value not provided for publish"),
ERROR_MOVE_FAILED (2012, "Moving of data failed during commit"),
-
+ ERROR_TOO_MANY_DYNAMIC_PTNS (2013, "Attempt to create too many dynamic partitions"),
/* Authorization Errors 3000 - 3999 */
ERROR_ACCESS_CONTROL (3000, "Permission denied"),
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java?rev=1149763&r1=1149762&r2=1149763&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java Fri Jul 22 23:38:07 2011
@@ -64,13 +64,18 @@ public final class HCatConstants {
public static final String HCAT_KEY_OUTPUT_INFO = HCAT_KEY_OUTPUT_BASE + ".info";
public static final String HCAT_KEY_HIVE_CONF = HCAT_KEY_OUTPUT_BASE + ".hive.conf";
public static final String HCAT_KEY_TOKEN_SIGNATURE = HCAT_KEY_OUTPUT_BASE + ".token.sig";
-
+ public static final String HCAT_KEY_JOBCLIENT_TOKEN_SIGNATURE = HCAT_KEY_OUTPUT_BASE + ".jobclient.token.sig";
+ public static final String HCAT_KEY_JOBCLIENT_TOKEN_STRFORM = HCAT_KEY_OUTPUT_BASE + ".jobclient.token.strform";
+
public static final String HCAT_MSG_CLEAN_FREQ = "hcat.msg.clean.freq";
public static final String HCAT_MSG_EXPIRY_DURATION = "hcat.msg.expiry.duration";
public static final String HCAT_MSGBUS_TOPIC_NAME = "hcat.msgbus.topic.name";
public static final String HCAT_MSGBUS_TOPIC_NAMING_POLICY = "hcat.msgbus.topic.naming.policy";
public static final String HCAT_MSGBUS_TOPIC_PREFIX = "hcat.msgbus.topic.prefix";
+
+ public static final String HCAT_DYNAMIC_PTN_JOBID = HCAT_KEY_OUTPUT_BASE + "dynamic.jobid";
+ public static final boolean HCAT_IS_DYNAMIC_MAX_PTN_CHECK_ENABLED = false;
// Message Bus related properties.
public static final String HCAT_DEFAULT_TOPIC_PREFIX = "hcat";
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatUtil.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatUtil.java?rev=1149763&r1=1149762&r2=1149763&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatUtil.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatUtil.java Fri Jul 22 23:38:07 2011
@@ -28,21 +28,42 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
import org.apache.hcatalog.data.schema.HCatFieldSchema;
import org.apache.hcatalog.data.schema.HCatSchema;
import org.apache.hcatalog.data.schema.HCatSchemaUtils;
+import org.apache.hcatalog.mapreduce.HCatOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.thrift.TException;
public class HCatUtil {
+// static final private Log LOG = LogFactory.getLog(HCatUtil.class);
+
public static boolean checkJobContextIfRunningFromBackend(JobContext j){
if (j.getConfiguration().get("mapred.task.id", "").equals("")){
return false;
@@ -256,4 +277,102 @@ public class HCatUtil {
return true;
}
+ public static Token<org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier> getJobTrackerDelegationToken(Configuration conf, String userName) throws Exception {
+// LOG.info("getJobTrackerDelegationToken("+conf+","+userName+")");
+ JobClient jcl = new JobClient(new JobConf(conf, HCatOutputFormat.class));
+ Token<org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier> t = jcl.getDelegationToken(new Text(userName));
+// LOG.info("got "+t);
+ return t;
+
+// return null;
+ }
+
+ public static void cancelJobTrackerDelegationToken(String tokenStrForm, String tokenSignature) throws Exception {
+// LOG.info("cancelJobTrackerDelegationToken("+tokenStrForm+","+tokenSignature+")");
+ JobClient jcl = new JobClient(new JobConf(new Configuration(), HCatOutputFormat.class));
+ Token<org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier> t = extractJobTrackerToken(tokenStrForm,tokenSignature);
+// LOG.info("canceling "+t);
+ try {
+ jcl.cancelDelegationToken(t);
+ }catch(Exception e){
+// HCatUtil.logToken(LOG, "jcl token to cancel", t);
+ // ignore if token has already been invalidated.
+ }
+ }
+
+
+ public static Token<? extends AbstractDelegationTokenIdentifier>
+ extractThriftToken(String tokenStrForm, String tokenSignature) throws MetaException, TException, IOException {
+// LOG.info("extractThriftToken("+tokenStrForm+","+tokenSignature+")");
+ Token<? extends AbstractDelegationTokenIdentifier> t = new Token<DelegationTokenIdentifier>();
+ t.decodeFromUrlString(tokenStrForm);
+ t.setService(new Text(tokenSignature));
+// LOG.info("returning "+t);
+ return t;
+ }
+
+ public static Token<org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier>
+ extractJobTrackerToken(String tokenStrForm, String tokenSignature) throws MetaException, TException, IOException {
+// LOG.info("extractJobTrackerToken("+tokenStrForm+","+tokenSignature+")");
+ Token<org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier> t =
+ new Token<org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier>();
+ t.decodeFromUrlString(tokenStrForm);
+ t.setService(new Text(tokenSignature));
+// LOG.info("returning "+t);
+ return t;
+ }
+
+ /**
+ * Logging stack trace
+ * @param logger
+ */
+ public static void logStackTrace(Log logger) {
+ StackTraceElement[] stackTrace = new Exception().getStackTrace();
+ for (int i = 1 ; i < stackTrace.length ; i++){
+ logger.info("\t"+stackTrace[i].toString());
+ }
+ }
+
+ /**
+ * debug log the hive conf
+ * @param logger
+ * @param hc
+ */
+ public static void logHiveConf(Log logger, HiveConf hc){
+ logEntrySet(logger,"logging hiveconf:",hc.getAllProperties().entrySet());
+ }
+
+
+ public static void logList(Log logger, String itemName, List<? extends Object> list){
+ logger.info(itemName+":");
+ for (Object item : list){
+ logger.info("\t["+item+"]");
+ }
+ }
+
+ public static void logMap(Log logger, String itemName, Map<? extends Object,? extends Object> map){
+ logEntrySet(logger,itemName,map.entrySet());
+ }
+
+ public static void logEntrySet(Log logger, String itemName, Set<? extends Entry> entrySet) {
+ logger.info(itemName+":");
+ for (Entry e : entrySet){
+ logger.info("\t["+e.getKey()+"]=>["+e.getValue()+"]");
+ }
+ }
+
+ public static void logAllTokens(Log logger, JobContext context) throws IOException {
+ for (Token<? extends TokenIdentifier>t : context.getCredentials().getAllTokens()){
+ logToken(logger,"token",t);
+ }
+ }
+
+ public static void logToken(Log logger, String itemName, Token<? extends TokenIdentifier> t) throws IOException {
+ logger.info(itemName+":");
+ logger.info("\tencodeToUrlString : "+t.encodeToUrlString());
+ logger.info("\ttoString : "+t.toString());
+ logger.info("\tkind : "+t.getKind());
+ logger.info("\tservice : "+t.getService());
+ }
+
}
Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/har/HarOutputCommitterPostProcessor.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/har/HarOutputCommitterPostProcessor.java?rev=1149763&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/har/HarOutputCommitterPostProcessor.java (added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/har/HarOutputCommitterPostProcessor.java Fri Jul 22 23:38:07 2011
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.har;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.Constants;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.tools.HadoopArchives;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.common.HCatUtil;
+
+public class HarOutputCommitterPostProcessor {
+
+// static final private Log LOG = LogFactory.getLog(HarOutputCommitterPostProcessor.class);
+
+ boolean isEnabled = false;
+
+ public boolean isEnabled() {
+ return isEnabled;
+ }
+
+ public void setEnabled(boolean enabled) {
+ this.isEnabled = enabled;
+ }
+
+
+ public void exec(JobContext context, Partition partition, Path partPath) throws IOException {
+// LOG.info("Archiving partition ["+partPath.toString()+"]");
+ makeHar(context,partPath.toUri().toString(),harFile(partPath));
+ partition.getParameters().put(Constants.IS_ARCHIVED, "true");
+ }
+
+ public String harFile(Path ptnPath) throws IOException{
+ String harFile = ptnPath.toString().replaceFirst("/+$", "") + ".har";
+// LOG.info("har file : " + harFile);
+ return harFile;
+ }
+
+ public String getParentFSPath(Path ptnPath) throws IOException {
+ return ptnPath.toUri().getPath().replaceFirst("/+$", "");
+ }
+
+ public String getProcessedLocation(Path ptnPath) throws IOException {
+ String harLocn = ("har://" + ptnPath.toUri().getPath()).replaceFirst("/+$", "") + ".har" + Path.SEPARATOR;
+// LOG.info("har location : " + harLocn);
+ return harLocn;
+ }
+
+
+ /**
+ * Creates a har file from the contents of a given directory, using that as root.
+ * @param dir Directory to archive
+ * @param harName The HAR file to create
+ */
+ public static void makeHar(JobContext context, String dir, String harFile) throws IOException{
+// Configuration conf = context.getConfiguration();
+// Credentials creds = context.getCredentials();
+
+// HCatUtil.logAllTokens(LOG,context);
+
+ int lastSep = harFile.lastIndexOf(Path.SEPARATOR_CHAR);
+ Path archivePath = new Path(harFile.substring(0,lastSep));
+ final String[] args = {
+ "-archiveName",
+ harFile.substring(lastSep+1, harFile.length()),
+ "-p",
+ dir,
+ "*",
+ archivePath.toString()
+ };
+// for (String arg : args){
+// LOG.info("Args to har : "+ arg);
+// }
+ try {
+ Configuration newConf = new Configuration();
+ FileSystem fs = archivePath.getFileSystem(newConf);
+
+ newConf.set("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
+// LOG.info("System.getenv(\"HADOOP_TOKEN_FILE_LOCATION\") =["+ System.getenv("HADOOP_TOKEN_FILE_LOCATION")+"]");
+
+// for (FileStatus ds : fs.globStatus(new Path(dir, "*"))){
+// LOG.info("src : "+ds.getPath().toUri().toString());
+// }
+
+ final HadoopArchives har = new HadoopArchives(newConf);
+ int rc = ToolRunner.run(har, args);
+ if (rc!= 0){
+ throw new Exception("Har returned error code "+rc);
+ }
+
+// for (FileStatus hs : fs.globStatus(new Path(harFile, "*"))){
+// LOG.info("dest : "+hs.getPath().toUri().toString());
+// }
+// doHarCheck(fs,harFile);
+// LOG.info("Nuking " + dir);
+ fs.delete(new Path(dir), true);
+ } catch (Exception e){
+ throw new HCatException("Error creating Har ["+harFile+"] from ["+dir+"]", e);
+ }
+ }
+
+}
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputCommitter.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputCommitter.java?rev=1149763&r1=1149762&r2=1149763&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputCommitter.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputCommitter.java Fri Jul 22 23:38:07 2011
@@ -34,7 +34,7 @@ public abstract class HCatBaseOutputComm
/** The underlying output committer */
protected final OutputCommitter baseCommitter;
- public HCatBaseOutputCommitter(OutputCommitter baseCommitter) {
+ public HCatBaseOutputCommitter(JobContext context, OutputCommitter baseCommitter) {
this.baseCommitter = baseCommitter;
}
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java?rev=1149763&r1=1149762&r2=1149763&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java Fri Jul 22 23:38:07 2011
@@ -20,9 +20,14 @@ package org.apache.hcatalog.mapreduce;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.JobContext;
@@ -36,6 +41,8 @@ import org.apache.hcatalog.data.schema.H
public abstract class HCatBaseOutputFormat extends OutputFormat<WritableComparable<?>, HCatRecord> {
+// static final private Log LOG = LogFactory.getLog(HCatBaseOutputFormat.class);
+
/**
* Gets the table schema for the table specified in the HCatOutputFormat.setOutput call
* on the specified job context.
@@ -83,7 +90,7 @@ public abstract class HCatBaseOutputForm
* @return the OutputJobInfo object
* @throws IOException the IO exception
*/
- static OutputJobInfo getJobInfo(JobContext jobContext) throws IOException {
+ public static OutputJobInfo getJobInfo(JobContext jobContext) throws IOException {
String jobString = jobContext.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO);
if( jobString == null ) {
throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED);
@@ -102,33 +109,107 @@ public abstract class HCatBaseOutputForm
@SuppressWarnings("unchecked")
static HCatOutputStorageDriver getOutputDriverInstance(
JobContext jobContext, OutputJobInfo jobInfo) throws IOException {
+ return getOutputDriverInstance(jobContext,jobInfo,(List<String>)null);
+ }
+
+ /**
+ * Gets the output storage driver instance, with allowing specification of missing dynamic partvals
+ * @param jobContext the job context
+ * @param jobInfo the output job info
+ * @return the output driver instance
+ * @throws IOException
+ */
+ @SuppressWarnings("unchecked")
+ static HCatOutputStorageDriver getOutputDriverInstance(
+ JobContext jobContext, OutputJobInfo jobInfo, List<String> dynamicPartVals) throws IOException {
try {
Class<? extends HCatOutputStorageDriver> driverClass =
(Class<? extends HCatOutputStorageDriver>)
Class.forName(jobInfo.getStorerInfo().getOutputSDClass());
HCatOutputStorageDriver driver = driverClass.newInstance();
+ Map<String, String> partitionValues = jobInfo.getTableInfo().getPartitionValues();
+ String location = jobInfo.getLocation();
+
+ if (dynamicPartVals != null){
+ // dynamic part vals specified
+ List<String> dynamicPartKeys = jobInfo.getTableInfo().getDynamicPartitioningKeys();
+ if (dynamicPartVals.size() != dynamicPartKeys.size()){
+ throw new HCatException(ErrorType.ERROR_INVALID_PARTITION_VALUES,
+ "Unable to instantiate dynamic partitioning storage driver, mismatch between"
+ + " number of partition values obtained["+dynamicPartVals.size()
+ + "] and number of partition values required["+dynamicPartKeys.size()+"]");
+ }
+ for (int i = 0; i < dynamicPartKeys.size(); i++){
+ partitionValues.put(dynamicPartKeys.get(i), dynamicPartVals.get(i));
+ }
+
+ // re-home location, now that we know the rest of the partvals
+ Table table = jobInfo.getTable();
+
+ List<String> partitionCols = new ArrayList<String>();
+ for(FieldSchema schema : table.getPartitionKeys()) {
+ partitionCols.add(schema.getName());
+ }
+
+ location = driver.getOutputLocation(jobContext,
+ table.getSd().getLocation() , partitionCols,
+ partitionValues,jobContext.getConfiguration().get(HCatConstants.HCAT_DYNAMIC_PTN_JOBID));
+ }
+
//Initialize the storage driver
driver.setSchema(jobContext, jobInfo.getOutputSchema());
- driver.setPartitionValues(jobContext, jobInfo.getTableInfo().getPartitionValues());
- driver.setOutputPath(jobContext, jobInfo.getLocation());
+ driver.setPartitionValues(jobContext, partitionValues);
+ driver.setOutputPath(jobContext, location);
+
+// HCatUtil.logMap(LOG,"Setting outputPath ["+location+"] for ",partitionValues);
driver.initialize(jobContext, jobInfo.getStorerInfo().getProperties());
return driver;
} catch(Exception e) {
+ if (e instanceof HCatException){
+ throw (HCatException)e;
+ }else{
throw new HCatException(ErrorType.ERROR_INIT_STORAGE_DRIVER, e);
+ }
}
}
+ /**
+ * Gets the output storage driver instance, with allowing specification
+ * of partvals from which it picks the dynamic partvals
+ * @param jobContext the job context
+ * @param jobInfo the output job info
+ * @return the output driver instance
+ * @throws IOException
+ */
+
+ protected static HCatOutputStorageDriver getOutputDriverInstance(
+ JobContext context, OutputJobInfo jobInfo,
+ Map<String, String> fullPartSpec) throws IOException {
+ List<String> dynamicPartKeys = jobInfo.getTableInfo().getDynamicPartitioningKeys();
+ if ((dynamicPartKeys == null)||(dynamicPartKeys.isEmpty())){
+ return getOutputDriverInstance(context,jobInfo,(List<String>)null);
+ }else{
+ List<String> dynKeyVals = new ArrayList<String>();
+ for (String dynamicPartKey : dynamicPartKeys){
+ dynKeyVals.add(fullPartSpec.get(dynamicPartKey));
+ }
+ return getOutputDriverInstance(context,jobInfo,dynKeyVals);
+ }
+ }
+
+
protected static void setPartDetails(OutputJobInfo jobInfo, final HCatSchema schema,
Map<String, String> partMap) throws HCatException, IOException {
List<Integer> posOfPartCols = new ArrayList<Integer>();
+ List<Integer> posOfDynPartCols = new ArrayList<Integer>();
// If partition columns occur in data, we want to remove them.
// So, find out positions of partition columns in schema provided by user.
// We also need to update the output Schema with these deletions.
-
+
// Note that, output storage drivers never sees partition columns in data
// or schema.
@@ -140,8 +221,26 @@ public abstract class HCatBaseOutputForm
schemaWithoutParts.remove(schema.get(partKey));
}
}
+
+ // Also, if dynamic partitioning is being used, we want to
+ // set appropriate list of columns for the columns to be dynamically specified.
+ // These would be partition keys too, so would also need to be removed from
+ // output schema and partcols
+
+ if (jobInfo.getTableInfo().isDynamicPartitioningUsed()){
+ for (String partKey : jobInfo.getTableInfo().getDynamicPartitioningKeys()){
+ Integer idx;
+ if((idx = schema.getPosition(partKey)) != null){
+ posOfPartCols.add(idx);
+ posOfDynPartCols.add(idx);
+ schemaWithoutParts.remove(schema.get(partKey));
+ }
+ }
+ }
+
HCatUtil.validatePartitionSchema(jobInfo.getTable(), schemaWithoutParts);
jobInfo.setPosOfPartCols(posOfPartCols);
+ jobInfo.setPosOfDynPartCols(posOfDynPartCols);
jobInfo.setOutputSchema(schemaWithoutParts);
}
}
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputCommitter.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputCommitter.java?rev=1149763&r1=1149762&r2=1149763&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputCommitter.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputCommitter.java Fri Jul 22 23:38:07 2011
@@ -40,6 +40,7 @@ import org.apache.hadoop.hive.ql.parse.E
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hcatalog.common.ErrorType;
import org.apache.hcatalog.common.HCatException;
@@ -47,8 +48,8 @@ public class HCatEximOutputCommitter ext
private static final Log LOG = LogFactory.getLog(HCatEximOutputCommitter.class);
- public HCatEximOutputCommitter(OutputCommitter baseCommitter) {
- super(baseCommitter);
+ public HCatEximOutputCommitter(JobContext context, OutputCommitter baseCommitter) {
+ super(context,baseCommitter);
}
@Override
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputFormat.java?rev=1149763&r1=1149762&r2=1149763&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputFormat.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputFormat.java Fri Jul 22 23:38:07 2011
@@ -91,7 +91,7 @@ public class HCatEximOutputFormat extend
@Override
public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
OutputFormat<? super WritableComparable<?>, ? super Writable> outputFormat = getOutputFormat(context);
- return new HCatEximOutputCommitter(outputFormat.getOutputCommitter(context));
+ return new HCatEximOutputCommitter(context,outputFormat.getOutputCommitter(context));
}
public static void setOutput(Job job, String dbname, String tablename, String location,
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputCommitter.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputCommitter.java?rev=1149763&r1=1149762&r2=1149763&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputCommitter.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputCommitter.java Fri Jul 22 23:38:07 2011
@@ -21,26 +21,35 @@ import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.Constants;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.JobStatus.State;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hcatalog.common.ErrorType;
import org.apache.hcatalog.common.HCatConstants;
@@ -49,49 +58,105 @@ import org.apache.hcatalog.common.HCatUt
import org.apache.hcatalog.data.schema.HCatFieldSchema;
import org.apache.hcatalog.data.schema.HCatSchema;
import org.apache.hcatalog.data.schema.HCatSchemaUtils;
+import org.apache.hcatalog.har.HarOutputCommitterPostProcessor;
import org.apache.thrift.TException;
public class HCatOutputCommitter extends OutputCommitter {
+// static final private Log LOG = LogFactory.getLog(HCatOutputCommitter.class);
+
/** The underlying output committer */
private final OutputCommitter baseCommitter;
- public HCatOutputCommitter(OutputCommitter baseCommitter) {
+ private final boolean dynamicPartitioningUsed;
+ private boolean partitionsDiscovered;
+
+ private Map<String, Map<String, String>> partitionsDiscoveredByPath;
+ private Map<String, HCatOutputStorageDriver> storageDriversDiscoveredByPath;
+
+ HarOutputCommitterPostProcessor harProcessor = new HarOutputCommitterPostProcessor();
+
+ private String ptnRootLocation = null;
+
+ public HCatOutputCommitter(JobContext context, OutputCommitter baseCommitter) throws IOException {
+ OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context);
+ dynamicPartitioningUsed = jobInfo.getTableInfo().isDynamicPartitioningUsed();
+ if (!dynamicPartitioningUsed){
this.baseCommitter = baseCommitter;
+ this.partitionsDiscovered = true;
+ }else{
+ this.baseCommitter = null;
+ this.partitionsDiscovered = false;
+ }
}
@Override
public void abortTask(TaskAttemptContext context) throws IOException {
+ if (!dynamicPartitioningUsed){
baseCommitter.abortTask(context);
+ }
}
@Override
public void commitTask(TaskAttemptContext context) throws IOException {
+ if (!dynamicPartitioningUsed){
baseCommitter.commitTask(context);
+ }else{
+ // called explicitly through HCatRecordWriter.close() if dynamic
+ }
}
@Override
public boolean needsTaskCommit(TaskAttemptContext context) throws IOException {
+ if (!dynamicPartitioningUsed){
return baseCommitter.needsTaskCommit(context);
+ }else{
+ // called explicitly through HCatRecordWriter.close() if dynamic - return false by default
+ return false;
+ }
}
@Override
public void setupJob(JobContext context) throws IOException {
- if( baseCommitter != null ) {
- baseCommitter.setupJob(context);
- }
+ if( baseCommitter != null ) {
+ baseCommitter.setupJob(context);
+ }
+ // in dynamic usecase, called through HCatRecordWriter
}
@Override
public void setupTask(TaskAttemptContext context) throws IOException {
+ if (!dynamicPartitioningUsed){
baseCommitter.setupTask(context);
- }
+ }else{
+ // called explicitly through HCatRecordWriter.write() if dynamic
+ }
+ }
@Override
public void abortJob(JobContext jobContext, State state) throws IOException {
+
+ if (dynamicPartitioningUsed){
+ discoverPartitions(jobContext);
+ }
+
if(baseCommitter != null) {
baseCommitter.abortJob(jobContext, state);
+ }else{
+ if (dynamicPartitioningUsed){
+ for(HCatOutputStorageDriver baseOsd : storageDriversDiscoveredByPath.values()){
+ try {
+ baseOsd.abortOutputCommitterJob(
+ new TaskAttemptContext(
+ jobContext.getConfiguration(), TaskAttemptID.forName(ptnRootLocation)
+ ),state);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+ }
}
+
OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext);
try {
@@ -106,6 +171,13 @@ public class HCatOutputCommitter extends
(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) {
client.cancelDelegationToken(tokenStrForm);
}
+
+ String jcTokenStrForm = jobContext.getConfiguration().get(HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_STRFORM);
+ String jcTokenSignature = jobContext.getConfiguration().get(HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_SIGNATURE);
+ if(jcTokenStrForm != null && jcTokenSignature != null) {
+ HCatUtil.cancelJobTrackerDelegationToken(tokenStrForm,jcTokenSignature);
+ }
+
} catch(Exception e) {
if( e instanceof HCatException ) {
throw (HCatException) e;
@@ -114,8 +186,16 @@ public class HCatOutputCommitter extends
}
}
- Path src = new Path(jobInfo.getLocation());
+ Path src;
+ if (dynamicPartitioningUsed){
+ src = new Path(getPartitionRootLocation(
+ jobInfo.getLocation().toString(),jobInfo.getTable().getPartitionKeysSize()
+ ));
+ }else{
+ src = new Path(jobInfo.getLocation());
+ }
FileSystem fs = src.getFileSystem(jobContext.getConfiguration());
+// LOG.warn("abortJob about to delete ["+src.toString() +"]");
fs.delete(src, true);
}
@@ -130,6 +210,10 @@ public class HCatOutputCommitter extends
@Override
public void commitJob(JobContext jobContext) throws IOException {
+ if (dynamicPartitioningUsed){
+ discoverPartitions(jobContext);
+ }
+
if(baseCommitter != null) {
baseCommitter.commitJob(jobContext);
}
@@ -153,12 +237,15 @@ public class HCatOutputCommitter extends
@Override
public void cleanupJob(JobContext context) throws IOException {
+ if (dynamicPartitioningUsed){
+ discoverPartitions(context);
+ }
+
OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context);
Configuration conf = context.getConfiguration();
Table table = jobInfo.getTable();
- StorageDescriptor tblSD = table.getSd();
- Path tblPath = new Path(tblSD.getLocation());
+ Path tblPath = new Path(table.getSd().getLocation());
FileSystem fs = tblPath.getFileSystem(conf);
if( table.getPartitionKeys().size() == 0 ) {
@@ -166,75 +253,116 @@ public class HCatOutputCommitter extends
if( baseCommitter != null ) {
baseCommitter.cleanupJob(context);
+ }else{
+ if (dynamicPartitioningUsed){
+ for(HCatOutputStorageDriver baseOsd : storageDriversDiscoveredByPath.values()){
+ try {
+ baseOsd.cleanupOutputCommitterJob(
+ new TaskAttemptContext(
+ context.getConfiguration(), TaskAttemptID.forName(ptnRootLocation)
+ ));
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+ }
}
-
+
//Move data from temp directory the actual table directory
//No metastore operation required.
Path src = new Path(jobInfo.getLocation());
- moveTaskOutputs(fs, src, src, tblPath);
+ moveTaskOutputs(fs, src, src, tblPath,false);
fs.delete(src, true);
return;
}
HiveMetaStoreClient client = null;
List<String> values = null;
- boolean partitionAdded = false;
HCatTableInfo tableInfo = jobInfo.getTableInfo();
+ List<Partition> partitionsAdded = new ArrayList<Partition>();
+
try {
client = HCatOutputFormat.createHiveClient(tableInfo.getServerUri(), conf);
StorerInfo storer = InitializeInput.extractStorerInfo(table.getSd(),table.getParameters());
- Partition partition = new Partition();
- partition.setDbName(tableInfo.getDatabaseName());
- partition.setTableName(tableInfo.getTableName());
- partition.setSd(new StorageDescriptor(tblSD));
- partition.getSd().setLocation(jobInfo.getLocation());
-
updateTableSchema(client, table, jobInfo.getOutputSchema());
+
+ FileStatus tblStat = fs.getFileStatus(tblPath);
+ String grpName = tblStat.getGroup();
+ FsPermission perms = tblStat.getPermission();
- List<FieldSchema> fields = new ArrayList<FieldSchema>();
- for(HCatFieldSchema fieldSchema : jobInfo.getOutputSchema().getFields()) {
- fields.add(HCatSchemaUtils.getFieldSchema(fieldSchema));
+ List<Partition> partitionsToAdd = new ArrayList<Partition>();
+ if (!dynamicPartitioningUsed){
+ partitionsToAdd.add(
+ constructPartition(
+ context,
+ tblPath.toString(), tableInfo.getPartitionValues()
+ ,jobInfo.getOutputSchema(), getStorerParameterMap(storer)
+ ,table, fs
+ ,grpName,perms));
+ }else{
+ for (Entry<String,Map<String,String>> entry : partitionsDiscoveredByPath.entrySet()){
+ partitionsToAdd.add(
+ constructPartition(
+ context,
+ getPartitionRootLocation(entry.getKey(),entry.getValue().size()), entry.getValue()
+ ,jobInfo.getOutputSchema(), getStorerParameterMap(storer)
+ ,table, fs
+ ,grpName,perms));
+ }
}
- partition.getSd().setCols(fields);
-
- Map<String,String> partKVs = tableInfo.getPartitionValues();
- //Get partition value list
- partition.setValues(getPartitionValueList(table,partKVs));
-
- Map<String, String> params = new HashMap<String, String>();
- params.put(HCatConstants.HCAT_ISD_CLASS, storer.getInputSDClass());
- params.put(HCatConstants.HCAT_OSD_CLASS, storer.getOutputSDClass());
+ //Publish the new partition(s)
+ if (dynamicPartitioningUsed && harProcessor.isEnabled() && (!partitionsToAdd.isEmpty())){
+
+ Path src = new Path(ptnRootLocation);
+
+ // check here for each dir we're copying out, to see if it already exists, error out if so
+ moveTaskOutputs(fs, src, src, tblPath,true);
+
+ moveTaskOutputs(fs, src, src, tblPath,false);
+ fs.delete(src, true);
+
+
+// for (Partition partition : partitionsToAdd){
+// partitionsAdded.add(client.add_partition(partition));
+// // currently following add_partition instead of add_partitions because latter isn't
+// // all-or-nothing and we want to be able to roll back partitions we added if need be.
+// }
- //Copy table level hcat.* keys to the partition
- for(Map.Entry<Object, Object> entry : storer.getProperties().entrySet()) {
- params.put(entry.getKey().toString(), entry.getValue().toString());
- }
+ try {
+ client.add_partitions(partitionsToAdd);
+ partitionsAdded = partitionsToAdd;
+ } catch (Exception e){
+ // There was an error adding partitions : rollback fs copy and rethrow
+ for (Partition p : partitionsToAdd){
+ Path ptnPath = new Path(harProcessor.getParentFSPath(new Path(p.getSd().getLocation())));
+ if (fs.exists(ptnPath)){
+ fs.delete(ptnPath,true);
+ }
+ }
+ throw e;
+ }
- partition.setParameters(params);
+ }else{
+ // no harProcessor, regular operation
- // Sets permissions and group name on partition dirs.
- FileStatus tblStat = fs.getFileStatus(tblPath);
- String grpName = tblStat.getGroup();
- FsPermission perms = tblStat.getPermission();
- Path partPath = tblPath;
- for(FieldSchema partKey : table.getPartitionKeys()){
- partPath = constructPartialPartPath(partPath, partKey.getName().toLowerCase(), partKVs);
- fs.setPermission(partPath, perms);
- try{
- fs.setOwner(partPath, null, grpName);
- } catch(AccessControlException ace){
- // log the messages before ignoring. Currently, logging is not built in Hcatalog.
+ // No duplicate partition publish case to worry about because we'll
+ // get a AlreadyExistsException here if so, and appropriately rollback
+
+ client.add_partitions(partitionsToAdd);
+ partitionsAdded = partitionsToAdd;
+
+ if (dynamicPartitioningUsed && (partitionsAdded.size()>0)){
+ Path src = new Path(ptnRootLocation);
+ moveTaskOutputs(fs, src, src, tblPath,false);
+ fs.delete(src, true);
}
+
}
-
- //Publish the new partition
- client.add_partition(partition);
- partitionAdded = true; //publish to metastore done
-
+
if( baseCommitter != null ) {
baseCommitter.cleanupJob(context);
}
@@ -247,13 +375,24 @@ public class HCatOutputCommitter extends
(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) {
client.cancelDelegationToken(tokenStrForm);
}
+
+ String jcTokenStrForm =
+ context.getConfiguration().get(HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_STRFORM);
+ String jcTokenSignature =
+ context.getConfiguration().get(HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_SIGNATURE);
+ if(jcTokenStrForm != null && jcTokenSignature != null) {
+ HCatUtil.cancelJobTrackerDelegationToken(tokenStrForm,jcTokenSignature);
+ }
+
} catch (Exception e) {
- if( partitionAdded ) {
+ if( partitionsAdded.size() > 0 ) {
try {
//baseCommitter.cleanupJob failed, try to clean up the metastore
+ for (Partition p : partitionsAdded){
client.dropPartition(tableInfo.getDatabaseName(),
- tableInfo.getTableName(), values);
+ tableInfo.getTableName(), p.getValues());
+ }
} catch(Exception te) {
//Keep cause as the original exception
throw new HCatException(ErrorType.ERROR_PUBLISHING_PARTITION, e);
@@ -272,6 +411,114 @@ public class HCatOutputCommitter extends
}
}
+ private String getPartitionRootLocation(String ptnLocn,int numPtnKeys) {
+ if (ptnRootLocation == null){
+ // we only need to calculate it once, it'll be the same for other partitions in this job.
+ Path ptnRoot = new Path(ptnLocn);
+ for (int i = 0; i < numPtnKeys; i++){
+// LOG.info("Getting parent of "+ptnRoot.getName());
+ ptnRoot = ptnRoot.getParent();
+ }
+ ptnRootLocation = ptnRoot.toString();
+ }
+// LOG.info("Returning final parent : "+ptnRootLocation);
+ return ptnRootLocation;
+ }
+
+ /**
+ * Generate partition metadata object to be used to add to metadata.
+ * @param partLocnRoot The table-equivalent location root of the partition
+ * (temporary dir if dynamic partition, table dir if static)
+ * @param partKVs The keyvalue pairs that form the partition
+ * @param outputSchema The output schema for the partition
+ * @param params The parameters to store inside the partition
+ * @param table The Table metadata object under which this Partition will reside
+ * @param fs FileSystem object to operate on the underlying filesystem
+ * @param grpName Group name that owns the table dir
+ * @param perms FsPermission that's the default permission of the table dir.
+ * @return Constructed Partition metadata object
+ * @throws IOException
+ */
+
+ private Partition constructPartition(
+ JobContext context,
+ String partLocnRoot, Map<String,String> partKVs,
+ HCatSchema outputSchema, Map<String, String> params,
+ Table table, FileSystem fs,
+ String grpName, FsPermission perms) throws IOException {
+
+ StorageDescriptor tblSD = table.getSd();
+
+ Partition partition = new Partition();
+ partition.setDbName(table.getDbName());
+ partition.setTableName(table.getTableName());
+ partition.setSd(new StorageDescriptor(tblSD));
+
+ List<FieldSchema> fields = new ArrayList<FieldSchema>();
+ for(HCatFieldSchema fieldSchema : outputSchema.getFields()) {
+ fields.add(HCatSchemaUtils.getFieldSchema(fieldSchema));
+ }
+
+ partition.getSd().setCols(fields);
+
+ partition.setValues(getPartitionValueList(table,partKVs));
+
+ partition.setParameters(params);
+
+ // Sets permissions and group name on partition dirs.
+
+ Path partPath = new Path(partLocnRoot);
+ for(FieldSchema partKey : table.getPartitionKeys()){
+ partPath = constructPartialPartPath(partPath, partKey.getName().toLowerCase(), partKVs);
+// LOG.info("Setting perms for "+partPath.toString());
+ fs.setPermission(partPath, perms);
+ try{
+ fs.setOwner(partPath, null, grpName);
+ } catch(AccessControlException ace){
+ // log the messages before ignoring. Currently, logging is not built in Hcatalog.
+// LOG.warn(ace);
+ }
+ }
+ if (dynamicPartitioningUsed){
+ String dynamicPartitionDestination = getFinalDynamicPartitionDestination(table,partKVs);
+ if (harProcessor.isEnabled()){
+ harProcessor.exec(context, partition, partPath);
+ partition.getSd().setLocation(
+ harProcessor.getProcessedLocation(new Path(dynamicPartitionDestination)));
+ }else{
+ partition.getSd().setLocation(dynamicPartitionDestination);
+ }
+ }else{
+ partition.getSd().setLocation(partPath.toString());
+ }
+
+ return partition;
+ }
+
+
+
+ private String getFinalDynamicPartitionDestination(Table table, Map<String,String> partKVs) {
+ // file:///tmp/hcat_junit_warehouse/employee/_DYN0.7770480401313761/emp_country=IN/emp_state=KA ->
+ // file:///tmp/hcat_junit_warehouse/employee/emp_country=IN/emp_state=KA
+ Path partPath = new Path(table.getSd().getLocation());
+ for(FieldSchema partKey : table.getPartitionKeys()){
+ partPath = constructPartialPartPath(partPath, partKey.getName().toLowerCase(), partKVs);
+ }
+ return partPath.toString();
+ }
+
+ private Map<String, String> getStorerParameterMap(StorerInfo storer) {
+ Map<String, String> params = new HashMap<String, String>();
+ params.put(HCatConstants.HCAT_ISD_CLASS, storer.getInputSDClass());
+ params.put(HCatConstants.HCAT_OSD_CLASS, storer.getOutputSDClass());
+
+ //Copy table level hcat.* keys to the partition
+ for(Map.Entry<Object, Object> entry : storer.getProperties().entrySet()) {
+ params.put(entry.getKey().toString(), entry.getValue().toString());
+ }
+ return params;
+ }
+
private Path constructPartialPartPath(Path partialPath, String partKey, Map<String,String> partKVs){
StringBuilder sb = new StringBuilder(FileUtils.escapePathName(partKey));
@@ -344,31 +591,42 @@ public class HCatOutputCommitter extends
* @param file the file to move
* @param src the source directory
* @param dest the target directory
+ * @param dryRun - a flag that simply tests if this move would succeed or not based
+ * on whether other files exist where we're trying to copy
* @throws IOException
*/
private void moveTaskOutputs(FileSystem fs,
Path file,
Path src,
- Path dest) throws IOException {
+ Path dest, boolean dryRun) throws IOException {
if (fs.isFile(file)) {
Path finalOutputPath = getFinalPath(file, src, dest);
- if (!fs.rename(file, finalOutputPath)) {
- if (!fs.delete(finalOutputPath, true)) {
- throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Failed to delete existing path " + finalOutputPath);
+ if (dryRun){
+// LOG.info("Testing if moving ["+file+"] to ["+finalOutputPath+"] would cause a problem");
+ if (fs.exists(finalOutputPath)){
+ throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Data already exists in " + finalOutputPath + ", duplicate publish possible.");
}
+ }else{
+// LOG.info("Moving ["+file+"] to ["+finalOutputPath+"]");
if (!fs.rename(file, finalOutputPath)) {
- throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Failed to move output to " + dest);
+ if (!fs.delete(finalOutputPath, true)) {
+ throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Failed to delete existing path " + finalOutputPath);
+ }
+ if (!fs.rename(file, finalOutputPath)) {
+ throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Failed to move output to " + dest);
+ }
}
}
} else if(fs.getFileStatus(file).isDir()) {
FileStatus[] paths = fs.listStatus(file);
Path finalOutputPath = getFinalPath(file, src, dest);
- fs.mkdirs(finalOutputPath);
-
+ if (!dryRun){
+ fs.mkdirs(finalOutputPath);
+ }
if (paths != null) {
for (FileStatus path : paths) {
- moveTaskOutputs(fs, path.getPath(), src, dest);
+ moveTaskOutputs(fs, path.getPath(), src, dest,dryRun);
}
}
}
@@ -398,4 +656,72 @@ public class HCatOutputCommitter extends
}
}
+ /**
+ * Run to discover dynamic partitions available
+ */
+ private void discoverPartitions(JobContext context) throws IOException {
+ if (!partitionsDiscovered){
+ // LOG.info("discover ptns called");
+
+ OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context);
+
+ harProcessor.setEnabled(jobInfo.getHarRequested());
+
+ List<Integer> dynamicPartCols = jobInfo.getPosOfDynPartCols();
+ int maxDynamicPartitions = jobInfo.getMaxDynamicPartitions();
+
+ Path loadPath = new Path(jobInfo.getLocation());
+ FileSystem fs = loadPath.getFileSystem(context.getConfiguration());
+
+ // construct a path pattern (e.g., /*/*) to find all dynamically generated paths
+
+ String dynPathSpec = loadPath.toUri().getPath();
+ dynPathSpec = dynPathSpec.replaceAll("__HIVE_DEFAULT_PARTITION__", "*");
+ // TODO : replace this with a param pull from HiveConf
+
+ // LOG.info("Searching for "+dynPathSpec);
+ Path pathPattern = new Path(loadPath, dynPathSpec);
+ FileStatus[] status = fs.globStatus(pathPattern);
+
+ partitionsDiscoveredByPath = new LinkedHashMap<String,Map<String, String>>();
+ storageDriversDiscoveredByPath = new LinkedHashMap<String,HCatOutputStorageDriver>();
+
+
+ if (status.length == 0) {
+ // LOG.warn("No partition found genereated by dynamic partitioning in ["
+ // +loadPath+"] with depth["+jobInfo.getTable().getPartitionKeysSize()
+ // +"], dynSpec["+dynPathSpec+"]");
+ }else{
+ if ((maxDynamicPartitions != -1) && (status.length > maxDynamicPartitions)){
+ this.partitionsDiscovered = true;
+ throw new HCatException(ErrorType.ERROR_TOO_MANY_DYNAMIC_PTNS,
+ "Number of dynamic partitions being created "
+ + "exceeds configured max allowable partitions["
+ + maxDynamicPartitions
+ + "], increase parameter ["
+ + HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.varname
+ + "] if needed.");
+ }
+
+ for (FileStatus st : status){
+ LinkedHashMap<String, String> fullPartSpec = new LinkedHashMap<String, String>();
+ Warehouse.makeSpecFromName(fullPartSpec, st.getPath());
+ partitionsDiscoveredByPath.put(st.getPath().toString(),fullPartSpec);
+ storageDriversDiscoveredByPath.put(st.getPath().toString(),
+ HCatOutputFormat.getOutputDriverInstance(context, jobInfo, fullPartSpec));
+ }
+ }
+
+ // for (Entry<String,Map<String,String>> spec : partitionsDiscoveredByPath.entrySet()){
+ // LOG.info("Partition "+ spec.getKey());
+ // for (Entry<String,String> e : spec.getValue().entrySet()){
+ // LOG.info(e.getKey() + "=>" +e.getValue());
+ // }
+ // }
+
+ this.partitionsDiscovered = true;
+ }
+ }
+
+
}
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java?rev=1149763&r1=1149762&r2=1149763&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java Fri Jul 22 23:38:07 2011
@@ -21,11 +21,14 @@ package org.apache.hcatalog.mapreduce;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Map.Entry;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -54,6 +57,7 @@ import org.apache.hadoop.security.UserGr
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.TokenSelector;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
import org.apache.hcatalog.common.ErrorType;
import org.apache.hcatalog.common.HCatConstants;
import org.apache.hcatalog.common.HCatException;
@@ -66,9 +70,15 @@ import org.apache.thrift.TException;
* and should be given as null. The value is the HCatRecord to write.*/
public class HCatOutputFormat extends HCatBaseOutputFormat {
+// static final private Log LOG = LogFactory.getLog(HCatOutputFormat.class);
+
/** The directory under which data is initially written for a non partitioned table */
protected static final String TEMP_DIR_NAME = "_TEMP";
- private static Map<String, Token<DelegationTokenIdentifier>> tokenMap = new HashMap<String, Token<DelegationTokenIdentifier>>();
+
+ /** */
+ protected static final String DYNTEMP_DIR_NAME = "_DYN";
+
+ private static Map<String, Token<? extends AbstractDelegationTokenIdentifier>> tokenMap = new HashMap<String, Token<? extends AbstractDelegationTokenIdentifier>>();
private static final PathFilter hiddenFileFilter = new PathFilter(){
public boolean accept(Path p){
@@ -76,6 +86,9 @@ public class HCatOutputFormat extends HC
return !name.startsWith("_") && !name.startsWith(".");
}
};
+
+ private static int maxDynamicPartitions;
+ private static boolean harRequested;
/**
* Set the info about the output to write for the Job. This queries the metadata server
@@ -90,17 +103,58 @@ public class HCatOutputFormat extends HC
try {
- Configuration conf = job.getConfiguration();
+ Configuration conf = job.getConfiguration();
client = createHiveClient(outputInfo.getServerUri(), conf);
Table table = client.getTable(outputInfo.getDatabaseName(), outputInfo.getTableName());
- if( outputInfo.getPartitionValues() == null ) {
+ if (table.getPartitionKeysSize() == 0 ){
+ if ((outputInfo.getPartitionValues() != null) && (!outputInfo.getPartitionValues().isEmpty())){
+ // attempt made to save partition values in non-partitioned table - throw error.
+ throw new HCatException(ErrorType.ERROR_INVALID_PARTITION_VALUES,
+ "Partition values specified for non-partitioned table");
+ }
+ // non-partitioned table
outputInfo.setPartitionValues(new HashMap<String, String>());
+
} else {
- //Convert user specified map to have lower case key names
+ // partitioned table, we expect partition values
+ // convert user specified map to have lower case key names
Map<String, String> valueMap = new HashMap<String, String>();
- for(Map.Entry<String, String> entry : outputInfo.getPartitionValues().entrySet()) {
- valueMap.put(entry.getKey().toLowerCase(), entry.getValue());
+ if (outputInfo.getPartitionValues() != null){
+ for(Map.Entry<String, String> entry : outputInfo.getPartitionValues().entrySet()) {
+ valueMap.put(entry.getKey().toLowerCase(), entry.getValue());
+ }
+ }
+
+ if (
+ (outputInfo.getPartitionValues() == null)
+ || (outputInfo.getPartitionValues().size() < table.getPartitionKeysSize())
+ ){
+ // dynamic partition usecase - partition values were null, or not all were specified
+ // need to figure out which keys are not specified.
+ List<String> dynamicPartitioningKeys = new ArrayList<String>();
+ boolean firstItem = true;
+ for (FieldSchema fs : table.getPartitionKeys()){
+ if (!valueMap.containsKey(fs.getName().toLowerCase())){
+ dynamicPartitioningKeys.add(fs.getName().toLowerCase());
+ }
+ }
+
+ if (valueMap.size() + dynamicPartitioningKeys.size() != table.getPartitionKeysSize()){
+ // If this isn't equal, then bogus key values have been inserted, error out.
+ throw new HCatException(ErrorType.ERROR_INVALID_PARTITION_VALUES,"Invalid partition keys specified");
+ }
+
+ outputInfo.setDynamicPartitioningKeys(dynamicPartitioningKeys);
+ String dynHash;
+ if ((dynHash = conf.get(HCatConstants.HCAT_DYNAMIC_PTN_JOBID)) == null){
+ dynHash = String.valueOf(Math.random());
+// LOG.info("New dynHash : ["+dynHash+"]");
+// }else{
+// LOG.info("Old dynHash : ["+dynHash+"]");
+ }
+ conf.set(HCatConstants.HCAT_DYNAMIC_PTN_JOBID, dynHash);
+
}
outputInfo.setPartitionValues(valueMap);
@@ -125,11 +179,13 @@ public class HCatOutputFormat extends HC
String tblLocation = tblSD.getLocation();
String location = driver.getOutputLocation(job,
tblLocation, partitionCols,
- outputInfo.getPartitionValues());
+ outputInfo.getPartitionValues(),conf.get(HCatConstants.HCAT_DYNAMIC_PTN_JOBID));
//Serialize the output info into the configuration
OutputJobInfo jobInfo = new OutputJobInfo(outputInfo,
tableSchema, tableSchema, storerInfo, location, table);
+ jobInfo.setHarRequested(harRequested);
+ jobInfo.setMaximumDynamicPartitions(maxDynamicPartitions);
conf.set(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(jobInfo));
Path tblPath = new Path(tblLocation);
@@ -176,6 +232,7 @@ public class HCatOutputFormat extends HC
// TableInfo, we can have as many tokens as there are stores and the TokenSelector
// will correctly pick the right tokens which the committer will use and
// cancel.
+
String tokenSignature = getTokenSignature(outputInfo);
if(tokenMap.get(tokenSignature) == null) {
// get delegation tokens from hcat server and store them into the "job"
@@ -183,19 +240,32 @@ public class HCatOutputFormat extends HC
// hcat
// when the JobTracker in Hadoop MapReduce starts supporting renewal of
// arbitrary tokens, the renewer should be the principal of the JobTracker
- String tokenStrForm = client.getDelegationToken(ugi.getUserName());
- Token<DelegationTokenIdentifier> t = new Token<DelegationTokenIdentifier>();
- t.decodeFromUrlString(tokenStrForm);
- t.setService(new Text(tokenSignature));
- tokenMap.put(tokenSignature, t);
+ tokenMap.put(tokenSignature, HCatUtil.extractThriftToken(
+ client.getDelegationToken(ugi.getUserName()),
+ tokenSignature));
+ }
+
+ String jcTokenSignature = "jc."+tokenSignature;
+ if(tokenMap.get(jcTokenSignature) == null) {
+ tokenMap.put(jcTokenSignature,
+ HCatUtil.getJobTrackerDelegationToken(conf,ugi.getUserName()));
}
+
job.getCredentials().addToken(new Text(ugi.getUserName() + tokenSignature),
tokenMap.get(tokenSignature));
// this will be used by the outputcommitter to pass on to the metastore client
// which in turn will pass on to the TokenSelector so that it can select
// the right token.
+ job.getCredentials().addToken(new Text(ugi.getUserName() + jcTokenSignature),
+ tokenMap.get(jcTokenSignature));
+
job.getConfiguration().set(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE, tokenSignature);
- }
+ job.getConfiguration().set(HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_SIGNATURE, jcTokenSignature);
+ job.getConfiguration().set(HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_STRFORM, tokenMap.get(jcTokenSignature).encodeToUrlString());
+
+// LOG.info("Set hive dt["+tokenSignature+"]");
+// LOG.info("Set jt dt["+jcTokenSignature+"]");
+ }
}
} catch(Exception e) {
if( e instanceof HCatException ) {
@@ -207,10 +277,10 @@ public class HCatOutputFormat extends HC
if( client != null ) {
client.close();
}
+// HCatUtil.logAllTokens(LOG,job);
}
}
-
// a signature string to associate with a HCatTableInfo - essentially
// a concatenation of dbname, tablename and partition keyvalues.
private static String getTokenSignature(HCatTableInfo outputInfo) {
@@ -232,11 +302,10 @@ public class HCatOutputFormat extends HC
return result.toString();
}
-
-
/**
* Handles duplicate publish of partition. Fails if partition already exists.
* For non partitioned tables, fails if files are present in table directory.
+ * For dynamic partitioned publish, does nothing - check would need to be done at recordwriter time
* @param job the job
* @param outputInfo the output info
* @param client the metastore client
@@ -247,18 +316,33 @@ public class HCatOutputFormat extends HC
*/
private static void handleDuplicatePublish(Job job, HCatTableInfo outputInfo,
HiveMetaStoreClient client, Table table) throws IOException, MetaException, TException {
- List<String> partitionValues = HCatOutputCommitter.getPartitionValueList(
- table, outputInfo.getPartitionValues());
+
+ /*
+ * For fully specified ptn, follow strict checks for existence of partitions in metadata
+ * For unpartitioned tables, follow filechecks
+ * For partially specified tables:
+ * This would then need filechecks at the start of a ptn write,
+ * Doing metadata checks can get potentially very expensive (fat conf) if
+ * there are a large number of partitions that match the partial specifications
+ */
if( table.getPartitionKeys().size() > 0 ) {
- //For partitioned table, fail if partition is already present
- List<String> currentParts = client.listPartitionNames(outputInfo.getDatabaseName(),
- outputInfo.getTableName(), partitionValues, (short) 1);
+ if (!outputInfo.isDynamicPartitioningUsed()){
+ List<String> partitionValues = HCatOutputCommitter.getPartitionValueList(
+ table, outputInfo.getPartitionValues());
+ // fully-specified partition
+ List<String> currentParts = client.listPartitionNames(outputInfo.getDatabaseName(),
+ outputInfo.getTableName(), partitionValues, (short) 1);
- if( currentParts.size() > 0 ) {
- throw new HCatException(ErrorType.ERROR_DUPLICATE_PARTITION);
+ if( currentParts.size() > 0 ) {
+ throw new HCatException(ErrorType.ERROR_DUPLICATE_PARTITION);
+ }
}
} else {
+ List<String> partitionValues = HCatOutputCommitter.getPartitionValueList(
+ table, outputInfo.getPartitionValues());
+ // non-partitioned table
+
Path tablePath = new Path(table.getSd().getLocation());
FileSystem fs = tablePath.getFileSystem(job.getConfiguration());
@@ -299,24 +383,12 @@ public class HCatOutputFormat extends HC
getRecordWriter(TaskAttemptContext context
) throws IOException, InterruptedException {
- // First create the RW.
HCatRecordWriter rw = new HCatRecordWriter(context);
-
- // Now set permissions and group on freshly created files.
- OutputJobInfo info = getJobInfo(context);
- Path workFile = rw.getStorageDriver().getWorkFilePath(context,info.getLocation());
- Path tblPath = new Path(info.getTable().getSd().getLocation());
- FileSystem fs = tblPath.getFileSystem(context.getConfiguration());
- FileStatus tblPathStat = fs.getFileStatus(tblPath);
- fs.setPermission(workFile, tblPathStat.getPermission());
- try{
- fs.setOwner(workFile, null, tblPathStat.getGroup());
- } catch(AccessControlException ace){
- // log the messages before ignoring. Currently, logging is not built in HCat.
- }
+ rw.prepareForStorageDriverOutput(context);
return rw;
}
+
/**
* Get the output committer for this output format. This is responsible
* for ensuring the output is committed correctly.
@@ -329,10 +401,17 @@ public class HCatOutputFormat extends HC
public OutputCommitter getOutputCommitter(TaskAttemptContext context
) throws IOException, InterruptedException {
OutputFormat<? super WritableComparable<?>, ? super Writable> outputFormat = getOutputFormat(context);
- return new HCatOutputCommitter(outputFormat.getOutputCommitter(context));
+ return new HCatOutputCommitter(context,outputFormat.getOutputCommitter(context));
}
static HiveMetaStoreClient createHiveClient(String url, Configuration conf) throws IOException, MetaException {
+ HiveConf hiveConf = getHiveConf(url, conf);
+// HCatUtil.logHiveConf(LOG, hiveConf);
+ return new HiveMetaStoreClient(hiveConf);
+ }
+
+
+ private static HiveConf getHiveConf(String url, Configuration conf) throws IOException {
HiveConf hiveConf = new HiveConf(HCatOutputFormat.class);
if( url != null ) {
@@ -372,9 +451,48 @@ public class HCatOutputFormat extends HC
}
}
+
+ // figure out what the maximum number of partitions allowed is, so we can pass it on to our outputinfo
+ if (HCatConstants.HCAT_IS_DYNAMIC_MAX_PTN_CHECK_ENABLED){
+ maxDynamicPartitions = hiveConf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS);
+ }else{
+ maxDynamicPartitions = -1; // disables bounds checking for maximum number of dynamic partitions
+ }
+ harRequested = hiveConf.getBoolVar(HiveConf.ConfVars.HIVEARCHIVEENABLED);
+ return hiveConf;
+ }
- return new HiveMetaStoreClient(hiveConf);
+ /**
+ * Any initialization of file paths, set permissions and group on freshly created files
+ * This is called at RecordWriter instantiation time which can be at write-time for
+ * a dynamic partitioning usecase
+ * @param context
+ * @throws IOException
+ */
+ public static void prepareOutputLocation(HCatOutputStorageDriver osd, TaskAttemptContext context) throws IOException {
+ OutputJobInfo info = HCatBaseOutputFormat.getJobInfo(context);
+// Path workFile = osd.getWorkFilePath(context,info.getLocation());
+ Path workFile = osd.getWorkFilePath(context,context.getConfiguration().get("mapred.output.dir"));
+ Path tblPath = new Path(info.getTable().getSd().getLocation());
+ FileSystem fs = tblPath.getFileSystem(context.getConfiguration());
+ FileStatus tblPathStat = fs.getFileStatus(tblPath);
+
+// LOG.info("Attempting to set permission ["+tblPathStat.getPermission()+"] on ["+
+// workFile+"], location=["+info.getLocation()+"] , mapred.locn =["+
+// context.getConfiguration().get("mapred.output.dir")+"]");
+//
+// FileStatus wFileStatus = fs.getFileStatus(workFile);
+// LOG.info("Table : "+tblPathStat.getPath());
+// LOG.info("Working File : "+wFileStatus.getPath());
+
+ fs.setPermission(workFile, tblPathStat.getPermission());
+ try{
+ fs.setOwner(workFile, null, tblPathStat.getGroup());
+ } catch(AccessControlException ace){
+ // log the messages before ignoring. Currently, logging is not built in HCat.
+ }
}
+
}
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputStorageDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputStorageDriver.java?rev=1149763&r1=1149762&r2=1149763&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputStorageDriver.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputStorageDriver.java Fri Jul 22 23:38:07 2011
@@ -22,15 +22,23 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus.State;
+import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.security.AccessControlException;
import org.apache.hcatalog.data.HCatRecord;
import org.apache.hcatalog.data.schema.HCatSchema;
@@ -40,6 +48,7 @@ import org.apache.hcatalog.data.schema.H
*/
public abstract class HCatOutputStorageDriver {
+
/**
* Initialize the storage driver with specified properties, default implementation does nothing.
* @param context the job context object
@@ -103,13 +112,22 @@ public abstract class HCatOutputStorageD
* @param jobContext the job context object
* @param tableLocation the location of the table
* @param partitionValues the partition values
+ * @param dynHash A unique hash value that represents the dynamic partitioning job used
* @return the location String.
* @throws IOException Signals that an I/O exception has occurred.
*/
public String getOutputLocation(JobContext jobContext,
- String tableLocation, List<String> partitionCols, Map<String, String> partitionValues) throws IOException {
+ String tableLocation, List<String> partitionCols, Map<String, String> partitionValues, String dynHash) throws IOException {
+
+ String parentPath = tableLocation;
+ // For dynamic partitioned writes without all keyvalues specified,
+ // we create a temp dir for the associated write job
+ if (dynHash != null){
+ parentPath = new Path(tableLocation, HCatOutputFormat.DYNTEMP_DIR_NAME+dynHash).toString();
+ }
- if( partitionValues == null || partitionValues.size() == 0 ) {
+ // For non-partitioned tables, we send them to the temp dir
+ if((dynHash == null) && ( partitionValues == null || partitionValues.size() == 0 )) {
return new Path(tableLocation, HCatOutputFormat.TEMP_DIR_NAME).toString();
}
@@ -120,7 +138,7 @@ public abstract class HCatOutputStorageD
String partitionLocation = FileUtils.makePartName(partitionCols, values);
- Path path = new Path(tableLocation, partitionLocation);
+ Path path = new Path(parentPath, partitionLocation);
return path.toString();
}
@@ -130,4 +148,59 @@ public abstract class HCatOutputStorageD
public Path getWorkFilePath(TaskAttemptContext context, String outputLoc) throws IOException{
return new Path(new FileOutputCommitter(new Path(outputLoc), context).getWorkPath(), FileOutputFormat.getUniqueFile(context, "part",""));
}
+
+ /**
+ * Implementation that calls the underlying output committer's setupJob,
+ * used in lieu of underlying committer's setupJob when using dynamic partitioning
+ * The default implementation should be overriden by underlying implementations
+ * that do not use FileOutputCommitter.
+ * The reason this function exists is so as to allow a storage driver implementor to
+ * override underlying OutputCommitter's setupJob implementation to allow for
+ * being called multiple times in a job, to make it idempotent.
+ * This should be written in a manner that is callable multiple times
+ * from individual tasks without stepping on each others' toes
+ *
+ * @param context
+ * @throws InterruptedException
+ * @throws IOException
+ */
+ public void setupOutputCommitterJob(TaskAttemptContext context)
+ throws IOException, InterruptedException{
+ getOutputFormat().getOutputCommitter(context).setupJob(context);
+ }
+
+ /**
+ * Implementation that calls the underlying output committer's cleanupJob,
+ * used in lieu of underlying committer's cleanupJob when using dynamic partitioning
+ * This should be written in a manner that is okay to call after having had
+ * multiple underlying outputcommitters write to task dirs inside it.
+ * While the base MR cleanupJob should have sufficed normally, this is provided
+ * in order to let people implementing setupOutputCommitterJob to cleanup properly
+ *
+ * @param context
+ * @throws IOException
+ */
+ public void cleanupOutputCommitterJob(TaskAttemptContext context)
+ throws IOException, InterruptedException{
+ getOutputFormat().getOutputCommitter(context).cleanupJob(context);
+ }
+
+ /**
+ * Implementation that calls the underlying output committer's abortJob,
+ * used in lieu of underlying committer's abortJob when using dynamic partitioning
+ * This should be written in a manner that is okay to call after having had
+ * multiple underlying outputcommitters write to task dirs inside it.
+ * While the base MR cleanupJob should have sufficed normally, this is provided
+ * in order to let people implementing setupOutputCommitterJob to abort properly
+ *
+ * @param context
+ * @param state
+ * @throws IOException
+ */
+ public void abortOutputCommitterJob(TaskAttemptContext context, State state)
+ throws IOException, InterruptedException{
+ getOutputFormat().getOutputCommitter(context).abortJob(context,state);
+ }
+
+
}
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordWriter.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordWriter.java?rev=1149763&r1=1149762&r2=1149763&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordWriter.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordWriter.java Fri Jul 22 23:38:07 2011
@@ -18,60 +18,174 @@
package org.apache.hcatalog.mapreduce;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hcatalog.common.ErrorType;
import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.common.HCatUtil;
import org.apache.hcatalog.data.HCatRecord;
public class HCatRecordWriter extends RecordWriter<WritableComparable<?>, HCatRecord> {
private final HCatOutputStorageDriver storageDriver;
- /**
- * @return the storageDriver
- */
- public HCatOutputStorageDriver getStorageDriver() {
- return storageDriver;
- }
+
+ private boolean dynamicPartitioningUsed = false;
+
+// static final private Log LOG = LogFactory.getLog(HCatRecordWriter.class);
private final RecordWriter<? super WritableComparable<?>, ? super Writable> baseWriter;
+ private final Map<Integer,RecordWriter<? super WritableComparable<?>, ? super Writable>> baseDynamicWriters;
+ private final Map<Integer,HCatOutputStorageDriver> baseDynamicStorageDrivers;
+
private final List<Integer> partColsToDel;
+ private final List<Integer> dynamicPartCols;
+ private int maxDynamicPartitions;
+
+ private OutputJobInfo jobInfo;
+ private TaskAttemptContext context;
public HCatRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
- OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context);
+ jobInfo = HCatOutputFormat.getJobInfo(context);
+ this.context = context;
// If partition columns occur in data, we want to remove them.
partColsToDel = jobInfo.getPosOfPartCols();
+ dynamicPartitioningUsed = jobInfo.getTableInfo().isDynamicPartitioningUsed();
+ dynamicPartCols = jobInfo.getPosOfDynPartCols();
+ maxDynamicPartitions = jobInfo.getMaxDynamicPartitions();
- if(partColsToDel == null){
+ if((partColsToDel == null) || (dynamicPartitioningUsed && (dynamicPartCols == null))){
throw new HCatException("It seems that setSchema() is not called on " +
"HCatOutputFormat. Please make sure that method is called.");
}
+
+
+ if (!dynamicPartitioningUsed){
+ this.storageDriver = HCatOutputFormat.getOutputDriverInstance(context, jobInfo);
+ this.baseWriter = storageDriver.getOutputFormat().getRecordWriter(context);
+ this.baseDynamicStorageDrivers = null;
+ this.baseDynamicWriters = null;
+ }else{
+ this.baseDynamicStorageDrivers = new HashMap<Integer,HCatOutputStorageDriver>();
+ this.baseDynamicWriters = new HashMap<Integer,RecordWriter<? super WritableComparable<?>, ? super Writable>>();
+ this.storageDriver = null;
+ this.baseWriter = null;
+ }
- this.storageDriver = HCatOutputFormat.getOutputDriverInstance(context, jobInfo);
- this.baseWriter = storageDriver.getOutputFormat().getRecordWriter(context);
+ }
+
+ /**
+ * @return the storageDriver
+ */
+ public HCatOutputStorageDriver getStorageDriver() {
+ return storageDriver;
}
@Override
public void close(TaskAttemptContext context) throws IOException,
InterruptedException {
+ if (dynamicPartitioningUsed){
+ for (RecordWriter<? super WritableComparable<?>, ? super Writable> bwriter : baseDynamicWriters.values()){
+ bwriter.close(context);
+ }
+ for (HCatOutputStorageDriver osd : baseDynamicStorageDrivers.values()){
+ OutputCommitter baseOutputCommitter = osd.getOutputFormat().getOutputCommitter(context);
+ if (baseOutputCommitter.needsTaskCommit(context)){
+ baseOutputCommitter.commitTask(context);
+ }
+ }
+ } else {
baseWriter.close(context);
+ }
}
@Override
public void write(WritableComparable<?> key, HCatRecord value) throws IOException,
InterruptedException {
+ RecordWriter<? super WritableComparable<?>, ? super Writable> localWriter;
+ HCatOutputStorageDriver localDriver;
+
+// HCatUtil.logList(LOG, "HCatRecord to write", value.getAll());
+
+ if (dynamicPartitioningUsed){
+ // calculate which writer to use from the remaining values - this needs to be done before we delete cols
+
+ List<String> dynamicPartValues = new ArrayList<String>();
+ for (Integer colToAppend : dynamicPartCols){
+ dynamicPartValues.add(value.get(colToAppend).toString());
+ }
+
+ int dynHashCode = dynamicPartValues.hashCode();
+ if (!baseDynamicWriters.containsKey(dynHashCode)){
+// LOG.info("Creating new storage driver["+baseDynamicStorageDrivers.size()
+// +"/"+maxDynamicPartitions+ "] for "+dynamicPartValues.toString());
+ if ((maxDynamicPartitions != -1) && (baseDynamicStorageDrivers.size() > maxDynamicPartitions)){
+ throw new HCatException(ErrorType.ERROR_TOO_MANY_DYNAMIC_PTNS,
+ "Number of dynamic partitions being created "
+ + "exceeds configured max allowable partitions["
+ + maxDynamicPartitions
+ + "], increase parameter ["
+ + HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.varname
+ + "] if needed.");
+ }
+// HCatUtil.logList(LOG, "dynamicpartvals", dynamicPartValues);
+// HCatUtil.logList(LOG, "dynamicpartCols", dynamicPartCols);
+
+ HCatOutputStorageDriver localOsd = createDynamicStorageDriver(dynamicPartValues);
+ RecordWriter<? super WritableComparable<?>, ? super Writable> baseRecordWriter
+ = localOsd.getOutputFormat().getRecordWriter(context);
+ localOsd.setupOutputCommitterJob(context);
+ OutputCommitter baseOutputCommitter = localOsd.getOutputFormat().getOutputCommitter(context);
+ baseOutputCommitter.setupTask(context);
+ prepareForStorageDriverOutput(localOsd,context);
+ baseDynamicWriters.put(dynHashCode, baseRecordWriter);
+ baseDynamicStorageDrivers.put(dynHashCode,localOsd);
+ }
+
+ localWriter = baseDynamicWriters.get(dynHashCode);
+ localDriver = baseDynamicStorageDrivers.get(dynHashCode);
+ }else{
+ localWriter = baseWriter;
+ localDriver = storageDriver;
+ }
for(Integer colToDel : partColsToDel){
value.remove(colToDel);
}
- //The key given by user is ignored
- WritableComparable<?> generatedKey = storageDriver.generateKey(value);
- Writable convertedValue = storageDriver.convertValue(value);
- baseWriter.write(generatedKey, convertedValue);
+
+ //The key given by user is ignored
+ WritableComparable<?> generatedKey = localDriver.generateKey(value);
+ Writable convertedValue = localDriver.convertValue(value);
+ localWriter.write(generatedKey, convertedValue);
+ }
+
+ protected HCatOutputStorageDriver createDynamicStorageDriver(List<String> dynamicPartVals) throws IOException {
+ HCatOutputStorageDriver localOsd = HCatOutputFormat.getOutputDriverInstance(context,jobInfo,dynamicPartVals);
+ return localOsd;
+ }
+
+ public void prepareForStorageDriverOutput(TaskAttemptContext context) throws IOException {
+ // Set permissions and group on freshly created files.
+ if (!dynamicPartitioningUsed){
+ HCatOutputStorageDriver localOsd = this.getStorageDriver();
+ prepareForStorageDriverOutput(localOsd,context);
+ }
+ }
+
+ private void prepareForStorageDriverOutput(HCatOutputStorageDriver localOsd,
+ TaskAttemptContext context) throws IOException {
+ HCatOutputFormat.prepareOutputLocation(localOsd,context);
}
}