You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by ha...@apache.org on 2011/08/01 17:17:06 UTC
svn commit: r1152865 [1/2] - in /incubator/hcatalog/trunk/src:
java/org/apache/hcatalog/common/ java/org/apache/hcatalog/mapreduce/
java/org/apache/hcatalog/pig/ java/org/apache/hcatalog/rcfile/
test/org/apache/hcatalog/listener/ test/org/apache/hcatal...
Author: hashutosh
Date: Mon Aug 1 17:17:04 2011
New Revision: 1152865
URL: http://svn.apache.org/viewvc?rev=1152865&view=rev
Log:
Reverting HCatalog-64
Removed:
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InputJobInfo.java
Modified:
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatUtil.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximInputFormat.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputCommitter.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputFormat.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatInputFormat.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputCommitter.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordWriter.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatTableInfo.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/JobInfo.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/OutputJobInfo.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatLoader.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/rcfile/RCFileInputDriver.java
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestNotificationListener.java
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatUtil.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatUtil.java?rev=1152865&r1=1152864&r2=1152865&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatUtil.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatUtil.java Mon Aug 1 17:17:04 2011
@@ -18,8 +18,24 @@
package org.apache.hcatalog.common;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
@@ -31,9 +47,8 @@ import org.apache.hadoop.hive.serde2.typ
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
@@ -41,12 +56,10 @@ import org.apache.hcatalog.data.schema.H
import org.apache.hcatalog.data.schema.HCatSchema;
import org.apache.hcatalog.data.schema.HCatSchemaUtils;
import org.apache.hcatalog.mapreduce.HCatOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.thrift.TException;
-import java.io.*;
-import java.util.*;
-import java.util.Map.Entry;
-
public class HCatUtil {
// static final private Log LOG = LogFactory.getLog(HCatUtil.class);
@@ -170,22 +183,6 @@ public class HCatUtil {
}
/**
- * return the partition columns from a table instance
- * @param table the instance to extract partition columns from
- * @return HCatSchema instance which contains the partition columns
- * @throws IOException
- */
- public static HCatSchema getPartitionColumns(Table table) throws IOException{
- HCatSchema cols = new HCatSchema(new LinkedList<HCatFieldSchema>());
- if( table.getPartitionKeys().size() != 0 ) {
- for (FieldSchema fs : table.getPartitionKeys()){
- cols.append(HCatSchemaUtils.getHCatFieldSchema(fs));
- }
- }
- return cols;
- }
-
- /**
* Validate partition schema, checks if the column types match between the partition
* and the existing table schema. Returns the list of columns present in the partition
* but not in the table.
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java?rev=1152865&r1=1152864&r2=1152865&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java Mon Aug 1 17:17:04 2011
@@ -18,19 +18,22 @@
package org.apache.hcatalog.mapreduce;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hcatalog.common.HCatConstants;
import org.apache.hcatalog.common.HCatUtil;
import org.apache.hcatalog.data.HCatRecord;
-import org.apache.hcatalog.data.schema.HCatFieldSchema;
import org.apache.hcatalog.data.schema.HCatSchema;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-
public abstract class HCatBaseInputFormat extends InputFormat<WritableComparable, HCatRecord> {
/**
@@ -72,15 +75,15 @@ public abstract class HCatBaseInputForma
//Get the job info from the configuration,
//throws exception if not initialized
- InputJobInfo inputJobInfo;
+ JobInfo jobInfo;
try {
- inputJobInfo = getJobInfo(jobContext);
+ jobInfo = getJobInfo(jobContext);
} catch (Exception e) {
throw new IOException(e);
}
List<InputSplit> splits = new ArrayList<InputSplit>();
- List<PartInfo> partitionInfoList = inputJobInfo.getPartitions();
+ List<PartInfo> partitionInfoList = jobInfo.getPartitions();
if(partitionInfoList == null ) {
//No partitions match the specified partition filter
return splits;
@@ -96,14 +99,8 @@ public abstract class HCatBaseInputForma
throw new IOException(e);
}
- HCatSchema allCols = new HCatSchema(new LinkedList<HCatFieldSchema>());
- for(HCatFieldSchema field: inputJobInfo.getTableInfo().getDataColumns().getFields())
- allCols.append(field);
- for(HCatFieldSchema field: inputJobInfo.getTableInfo().getPartitionColumns().getFields())
- allCols.append(field);
-
//Pass all required information to the storage driver
- initStorageDriver(storageDriver, localJob, partitionInfo, allCols);
+ initStorageDriver(storageDriver, localJob, partitionInfo, jobInfo.getTableSchema());
//Get the input format for the storage driver
InputFormat inputFormat =
@@ -117,7 +114,7 @@ public abstract class HCatBaseInputForma
splits.add(new HCatSplit(
partitionInfo,
split,
- allCols));
+ jobInfo.getTableSchema()));
}
}
@@ -142,10 +139,10 @@ public abstract class HCatBaseInputForma
HCatSplit hcatSplit = (HCatSplit) split;
PartInfo partitionInfo = hcatSplit.getPartitionInfo();
- //If running through a Pig job, the InputJobInfo will not be available in the
+ //If running through a Pig job, the JobInfo will not be available in the
//backend process context (since HCatLoader works on a copy of the JobContext and does
//not call HCatInputFormat.setInput in the backend process).
- //So this function should NOT attempt to read the InputJobInfo.
+ //So this function should NOT attempt to read the JobInfo.
HCatInputStorageDriver storageDriver;
try {
@@ -177,30 +174,25 @@ public abstract class HCatBaseInputForma
* @throws Exception if HCatInputFromat.setInput has not been called for the current context
*/
public static HCatSchema getTableSchema(JobContext context) throws Exception {
- InputJobInfo inputJobInfo = getJobInfo(context);
- HCatSchema allCols = new HCatSchema(new LinkedList<HCatFieldSchema>());
- for(HCatFieldSchema field: inputJobInfo.getTableInfo().getDataColumns().getFields())
- allCols.append(field);
- for(HCatFieldSchema field: inputJobInfo.getTableInfo().getPartitionColumns().getFields())
- allCols.append(field);
- return allCols;
+ JobInfo jobInfo = getJobInfo(context);
+ return jobInfo.getTableSchema();
}
/**
- * Gets the InputJobInfo object by reading the Configuration and deserializing
- * the string. If InputJobInfo is not present in the configuration, throws an
+ * Gets the JobInfo object by reading the Configuration and deserializing
+ * the string. If JobInfo is not present in the configuration, throws an
* exception since that means HCatInputFormat.setInput has not been called.
* @param jobContext the job context
- * @return the InputJobInfo object
+ * @return the JobInfo object
* @throws Exception the exception
*/
- private static InputJobInfo getJobInfo(JobContext jobContext) throws Exception {
+ private static JobInfo getJobInfo(JobContext jobContext) throws Exception {
String jobString = jobContext.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO);
if( jobString == null ) {
throw new Exception("job information not found in JobContext. HCatInputFormat.setInput() not called?");
}
- return (InputJobInfo) HCatUtil.deserialize(jobString);
+ return (JobInfo) HCatUtil.deserialize(jobString);
}
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java?rev=1152865&r1=1152864&r2=1152865&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java Mon Aug 1 17:17:04 2011
@@ -18,6 +18,14 @@
package org.apache.hcatalog.mapreduce;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.io.Writable;
@@ -31,11 +39,6 @@ import org.apache.hcatalog.common.HCatUt
import org.apache.hcatalog.data.HCatRecord;
import org.apache.hcatalog.data.schema.HCatSchema;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
public abstract class HCatBaseOutputFormat extends OutputFormat<WritableComparable<?>, HCatRecord> {
// static final private Log LOG = LogFactory.getLog(HCatBaseOutputFormat.class);
@@ -49,7 +52,7 @@ public abstract class HCatBaseOutputForm
*/
public static HCatSchema getTableSchema(JobContext context) throws IOException {
OutputJobInfo jobInfo = getJobInfo(context);
- return jobInfo.getTableInfo().getDataColumns();
+ return jobInfo.getTableSchema();
}
/**
@@ -81,7 +84,7 @@ public abstract class HCatBaseOutputForm
/**
* Gets the HCatOuputJobInfo object by reading the Configuration and deserializing
- * the string. If InputJobInfo is not present in the configuration, throws an
+ * the string. If JobInfo is not present in the configuration, throws an
* exception since that means HCatOutputFormat.setOutput has not been called.
* @param jobContext the job context
* @return the OutputJobInfo object
@@ -122,15 +125,15 @@ public abstract class HCatBaseOutputForm
try {
Class<? extends HCatOutputStorageDriver> driverClass =
(Class<? extends HCatOutputStorageDriver>)
- Class.forName(jobInfo.getTableInfo().getStorerInfo().getOutputSDClass());
+ Class.forName(jobInfo.getStorerInfo().getOutputSDClass());
HCatOutputStorageDriver driver = driverClass.newInstance();
- Map<String, String> partitionValues = jobInfo.getPartitionValues();
+ Map<String, String> partitionValues = jobInfo.getTableInfo().getPartitionValues();
String location = jobInfo.getLocation();
if (dynamicPartVals != null){
// dynamic part vals specified
- List<String> dynamicPartKeys = jobInfo.getDynamicPartitioningKeys();
+ List<String> dynamicPartKeys = jobInfo.getTableInfo().getDynamicPartitioningKeys();
if (dynamicPartVals.size() != dynamicPartKeys.size()){
throw new HCatException(ErrorType.ERROR_INVALID_PARTITION_VALUES,
"Unable to instantiate dynamic partitioning storage driver, mismatch between"
@@ -142,7 +145,7 @@ public abstract class HCatBaseOutputForm
}
// re-home location, now that we know the rest of the partvals
- Table table = jobInfo.getTableInfo().getTable();
+ Table table = jobInfo.getTable();
List<String> partitionCols = new ArrayList<String>();
for(FieldSchema schema : table.getPartitionKeys()) {
@@ -161,7 +164,7 @@ public abstract class HCatBaseOutputForm
// HCatUtil.logMap(LOG,"Setting outputPath ["+location+"] for ",partitionValues);
- driver.initialize(jobContext, jobInfo.getTableInfo().getStorerInfo().getProperties());
+ driver.initialize(jobContext, jobInfo.getStorerInfo().getProperties());
return driver;
} catch(Exception e) {
@@ -176,7 +179,7 @@ public abstract class HCatBaseOutputForm
/**
* Gets the output storage driver instance, with allowing specification
* of partvals from which it picks the dynamic partvals
- * @param context the job context
+ * @param jobContext the job context
* @param jobInfo the output job info
* @return the output driver instance
* @throws IOException
@@ -185,7 +188,7 @@ public abstract class HCatBaseOutputForm
protected static HCatOutputStorageDriver getOutputDriverInstance(
JobContext context, OutputJobInfo jobInfo,
Map<String, String> fullPartSpec) throws IOException {
- List<String> dynamicPartKeys = jobInfo.getDynamicPartitioningKeys();
+ List<String> dynamicPartKeys = jobInfo.getTableInfo().getDynamicPartitioningKeys();
if ((dynamicPartKeys == null)||(dynamicPartKeys.isEmpty())){
return getOutputDriverInstance(context,jobInfo,(List<String>)null);
}else{
@@ -224,8 +227,8 @@ public abstract class HCatBaseOutputForm
// These would be partition keys too, so would also need to be removed from
// output schema and partcols
- if (jobInfo.isDynamicPartitioningUsed()){
- for (String partKey : jobInfo.getDynamicPartitioningKeys()){
+ if (jobInfo.getTableInfo().isDynamicPartitioningUsed()){
+ for (String partKey : jobInfo.getTableInfo().getDynamicPartitioningKeys()){
Integer idx;
if((idx = schema.getPosition(partKey)) != null){
posOfPartCols.add(idx);
@@ -235,7 +238,7 @@ public abstract class HCatBaseOutputForm
}
}
- HCatUtil.validatePartitionSchema(jobInfo.getTableInfo().getTable(), schemaWithoutParts);
+ HCatUtil.validatePartitionSchema(jobInfo.getTable(), schemaWithoutParts);
jobInfo.setPosOfPartCols(posOfPartCols);
jobInfo.setPosOfDynPartCols(posOfDynPartCols);
jobInfo.setOutputSchema(schemaWithoutParts);
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximInputFormat.java?rev=1152865&r1=1152864&r2=1152865&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximInputFormat.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximInputFormat.java Mon Aug 1 17:17:04 2011
@@ -18,6 +18,15 @@
package org.apache.hcatalog.mapreduce;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
@@ -30,11 +39,6 @@ import org.apache.hcatalog.common.HCatUt
import org.apache.hcatalog.data.schema.HCatSchema;
import org.apache.hcatalog.data.schema.HCatSchemaUtils;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.*;
-
/** The InputFormat to use to read data from HCat */
public class HCatEximInputFormat extends HCatBaseInputFormat {
@@ -64,7 +68,8 @@ public class HCatEximInputFormat extends
Map.Entry<org.apache.hadoop.hive.metastore.api.Table, List<Partition>> tp = EximUtil
.readMetaData(fs, metadataPath);
org.apache.hadoop.hive.metastore.api.Table table = tp.getKey();
- InputJobInfo inputInfo = InputJobInfo.create(table.getDbName(), table.getTableName(),null,null,null);
+ HCatTableInfo inputInfo = HCatTableInfo.getInputTableInfo(null,
+ null, table.getDbName(), table.getTableName());
List<FieldSchema> partCols = table.getPartitionKeys();
List<PartInfo> partInfoList = null;
if (partCols.size() > 0) {
@@ -93,11 +98,11 @@ public class HCatEximInputFormat extends
PartInfo partInfo = new PartInfo(schema, inputStorageDriverClass, location + "/data", hcatProperties);
partInfoList.add(partInfo);
}
- inputInfo.setPartitions(partInfoList);
- inputInfo.setTableInfo(HCatTableInfo.valueOf(table));
+ JobInfo hcatJobInfo = new JobInfo(inputInfo,
+ HCatUtil.getTableSchemaWithPtnCols(table), partInfoList);
job.getConfiguration().set(
HCatConstants.HCAT_KEY_JOB_INFO,
- HCatUtil.serialize(inputInfo));
+ HCatUtil.serialize(hcatJobInfo));
List<HCatSchema> rv = new ArrayList<HCatSchema>(2);
rv.add(HCatSchemaUtils.getHCatSchema(table.getSd().getCols()));
rv.add(HCatSchemaUtils.getHCatSchema(partCols));
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputCommitter.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputCommitter.java?rev=1152865&r1=1152864&r2=1152865&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputCommitter.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputCommitter.java Mon Aug 1 17:17:04 2011
@@ -18,6 +18,13 @@
package org.apache.hcatalog.mapreduce;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -33,16 +40,10 @@ import org.apache.hadoop.hive.ql.parse.E
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hcatalog.common.ErrorType;
import org.apache.hcatalog.common.HCatException;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
public class HCatEximOutputCommitter extends HCatBaseOutputCommitter {
private static final Log LOG = LogFactory.getLog(HCatEximOutputCommitter.class);
@@ -64,7 +65,7 @@ public class HCatEximOutputCommitter ext
Configuration conf = jobContext.getConfiguration();
FileSystem fs;
try {
- fs = FileSystem.get(new URI(jobInfo.getTableInfo().getTable().getSd().getLocation()), conf);
+ fs = FileSystem.get(new URI(jobInfo.getTable().getSd().getLocation()), conf);
} catch (URISyntaxException e) {
throw new IOException(e);
}
@@ -74,7 +75,7 @@ public class HCatEximOutputCommitter ext
private static void doCleanup(OutputJobInfo jobInfo, FileSystem fs) throws IOException,
HCatException {
try {
- Table ttable = jobInfo.getTableInfo().getTable();
+ Table ttable = jobInfo.getTable();
org.apache.hadoop.hive.ql.metadata.Table table = new org.apache.hadoop.hive.ql.metadata.Table(
ttable);
StorageDescriptor tblSD = ttable.getSd();
@@ -95,7 +96,7 @@ public class HCatEximOutputCommitter ext
}
}
if (!table.getPartitionKeys().isEmpty()) {
- Map<String, String> partitionValues = jobInfo.getPartitionValues();
+ Map<String, String> partitionValues = jobInfo.getTableInfo().getPartitionValues();
org.apache.hadoop.hive.ql.metadata.Partition partition =
new org.apache.hadoop.hive.ql.metadata.Partition(table,
partitionValues,
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputFormat.java?rev=1152865&r1=1152864&r2=1152865&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputFormat.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputFormat.java Mon Aug 1 17:17:04 2011
@@ -18,6 +18,12 @@
package org.apache.hcatalog.mapreduce;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.TreeMap;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -31,7 +37,11 @@ import org.apache.hadoop.hive.ql.io.RCFi
import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hcatalog.common.ErrorType;
import org.apache.hcatalog.common.HCatConstants;
import org.apache.hcatalog.common.HCatException;
@@ -43,12 +53,6 @@ import org.apache.hcatalog.data.schema.H
import org.apache.hcatalog.rcfile.RCFileInputDriver;
import org.apache.hcatalog.rcfile.RCFileOutputDriver;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.TreeMap;
-
/**
* The OutputFormat to use to write data to HCat without a hcat server. This can then
* be imported into a hcat instance, or used with a HCatEximInputFormat. As in
@@ -124,7 +128,8 @@ public class HCatEximOutputFormat extend
}
}
StorerInfo storerInfo = new StorerInfo(isdname, osdname, new Properties());
- OutputJobInfo outputJobInfo = OutputJobInfo.create(dbname,tablename,partSpec,null,null);
+ HCatTableInfo outputInfo = HCatTableInfo.getOutputTableInfo(null, null, dbname, tablename,
+ partSpec);
org.apache.hadoop.hive.ql.metadata.Table tbl = new
org.apache.hadoop.hive.ql.metadata.Table(dbname, tablename);
Table table = tbl.getTTable();
@@ -141,17 +146,16 @@ public class HCatEximOutputFormat extend
StorageDescriptor sd = table.getSd();
sd.setLocation(location);
String dataLocation = location + "/" + partname;
- outputJobInfo.setTableInfo(new HCatTableInfo(dbname,tablename,columnSchema,null,storerInfo,table));
- outputJobInfo.setOutputSchema(columnSchema);
- outputJobInfo.setLocation(dataLocation);
- setPartDetails(outputJobInfo, columnSchema, partSpec);
- sd.setCols(HCatUtil.getFieldSchemaList(outputJobInfo.getOutputSchema().getFields()));
+ OutputJobInfo jobInfo = new OutputJobInfo(outputInfo,
+ columnSchema, columnSchema, storerInfo, dataLocation, table);
+ setPartDetails(jobInfo, columnSchema, partSpec);
+ sd.setCols(HCatUtil.getFieldSchemaList(jobInfo.getOutputSchema().getFields()));
sd.setInputFormat(ifname);
sd.setOutputFormat(ofname);
SerDeInfo serdeInfo = sd.getSerdeInfo();
serdeInfo.setSerializationLib(serializationLib);
Configuration conf = job.getConfiguration();
- conf.set(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(outputJobInfo));
+ conf.set(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(jobInfo));
} catch (IOException e) {
throw new HCatException(ErrorType.ERROR_SET_OUTPUT, e);
} catch (MetaException e) {
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatInputFormat.java?rev=1152865&r1=1152864&r2=1152865&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatInputFormat.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatInputFormat.java Mon Aug 1 17:17:04 2011
@@ -18,10 +18,10 @@
package org.apache.hcatalog.mapreduce;
-import org.apache.hadoop.mapreduce.Job;
-
import java.io.IOException;
+import org.apache.hadoop.mapreduce.Job;
+
/** The InputFormat to use to read data from HCat */
public class HCatInputFormat extends HCatBaseInputFormat {
@@ -31,13 +31,13 @@ public class HCatInputFormat extends HCa
* the information in the conf object. The inputInfo object is updated with
* information needed in the client context
* @param job the job object
- * @param inputJobInfo the input info for table to read
+ * @param inputInfo the table input info
* @throws IOException the exception in communicating with the metadata server
*/
public static void setInput(Job job,
- InputJobInfo inputJobInfo) throws IOException {
+ HCatTableInfo inputInfo) throws IOException {
try {
- InitializeInput.setInput(job, inputJobInfo);
+ InitializeInput.setInput(job, inputInfo);
} catch (Exception e) {
throw new IOException(e);
}
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputCommitter.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputCommitter.java?rev=1152865&r1=1152864&r2=1152865&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputCommitter.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputCommitter.java Mon Aug 1 17:17:04 2011
@@ -17,6 +17,17 @@
*/
package org.apache.hcatalog.mapreduce;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -26,11 +37,18 @@ import org.apache.hadoop.hive.common.Fil
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.Warehouse;
-import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.metastore.api.Constants;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobStatus.State;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.JobStatus.State;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hcatalog.common.ErrorType;
@@ -43,11 +61,6 @@ import org.apache.hcatalog.data.schema.H
import org.apache.hcatalog.har.HarOutputCommitterPostProcessor;
import org.apache.thrift.TException;
-import java.io.IOException;
-import java.net.URI;
-import java.util.*;
-import java.util.Map.Entry;
-
public class HCatOutputCommitter extends OutputCommitter {
// static final private Log LOG = LogFactory.getLog(HCatOutputCommitter.class);
@@ -67,7 +80,7 @@ public class HCatOutputCommitter extends
public HCatOutputCommitter(JobContext context, OutputCommitter baseCommitter) throws IOException {
OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context);
- dynamicPartitioningUsed = jobInfo.isDynamicPartitioningUsed();
+ dynamicPartitioningUsed = jobInfo.getTableInfo().isDynamicPartitioningUsed();
if (!dynamicPartitioningUsed){
this.baseCommitter = baseCommitter;
this.partitionsDiscovered = true;
@@ -148,7 +161,7 @@ public class HCatOutputCommitter extends
try {
HiveMetaStoreClient client = HCatOutputFormat.createHiveClient(
- jobInfo.getServerUri(), jobContext.getConfiguration());
+ jobInfo.getTableInfo().getServerUri(), jobContext.getConfiguration());
// cancel the deleg. tokens that were acquired for this job now that
// we are done - we should cancel if the tokens were acquired by
// HCatOutputFormat and not if they were supplied by Oozie. In the latter
@@ -176,7 +189,7 @@ public class HCatOutputCommitter extends
Path src;
if (dynamicPartitioningUsed){
src = new Path(getPartitionRootLocation(
- jobInfo.getLocation().toString(),jobInfo.getTableInfo().getTable().getPartitionKeysSize()
+ jobInfo.getLocation().toString(),jobInfo.getTable().getPartitionKeysSize()
));
}else{
src = new Path(jobInfo.getLocation());
@@ -231,7 +244,7 @@ public class HCatOutputCommitter extends
OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context);
Configuration conf = context.getConfiguration();
- Table table = jobInfo.getTableInfo().getTable();
+ Table table = jobInfo.getTable();
Path tblPath = new Path(table.getSd().getLocation());
FileSystem fs = tblPath.getFileSystem(conf);
@@ -270,7 +283,7 @@ public class HCatOutputCommitter extends
List<Partition> partitionsAdded = new ArrayList<Partition>();
try {
- client = HCatOutputFormat.createHiveClient(jobInfo.getServerUri(), conf);
+ client = HCatOutputFormat.createHiveClient(tableInfo.getServerUri(), conf);
StorerInfo storer = InitializeInput.extractStorerInfo(table.getSd(),table.getParameters());
@@ -285,7 +298,7 @@ public class HCatOutputCommitter extends
partitionsToAdd.add(
constructPartition(
context,
- tblPath.toString(), jobInfo.getPartitionValues()
+ tblPath.toString(), tableInfo.getPartitionValues()
,jobInfo.getOutputSchema(), getStorerParameterMap(storer)
,table, fs
,grpName,perms));
@@ -303,35 +316,35 @@ public class HCatOutputCommitter extends
//Publish the new partition(s)
if (dynamicPartitioningUsed && harProcessor.isEnabled() && (!partitionsToAdd.isEmpty())){
+
+ Path src = new Path(ptnRootLocation);
- Path src = new Path(ptnRootLocation);
-
- // check here for each dir we're copying out, to see if it already exists, error out if so
- moveTaskOutputs(fs, src, src, tblPath,true);
-
- moveTaskOutputs(fs, src, src, tblPath,false);
- fs.delete(src, true);
-
+ // check here for each dir we're copying out, to see if it already exists, error out if so
+ moveTaskOutputs(fs, src, src, tblPath,true);
+
+ moveTaskOutputs(fs, src, src, tblPath,false);
+ fs.delete(src, true);
+
+
+// for (Partition partition : partitionsToAdd){
+// partitionsAdded.add(client.add_partition(partition));
+// // currently following add_partition instead of add_partitions because latter isn't
+// // all-or-nothing and we want to be able to roll back partitions we added if need be.
+// }
- // for (Partition partition : partitionsToAdd){
- // partitionsAdded.add(client.add_partition(partition));
- // // currently following add_partition instead of add_partitions because latter isn't
- // // all-or-nothing and we want to be able to roll back partitions we added if need be.
- // }
-
- try {
- client.add_partitions(partitionsToAdd);
- partitionsAdded = partitionsToAdd;
- } catch (Exception e){
- // There was an error adding partitions : rollback fs copy and rethrow
- for (Partition p : partitionsToAdd){
- Path ptnPath = new Path(harProcessor.getParentFSPath(new Path(p.getSd().getLocation())));
- if (fs.exists(ptnPath)){
- fs.delete(ptnPath,true);
- }
- }
- throw e;
- }
+ try {
+ client.add_partitions(partitionsToAdd);
+ partitionsAdded = partitionsToAdd;
+ } catch (Exception e){
+ // There was an error adding partitions : rollback fs copy and rethrow
+ for (Partition p : partitionsToAdd){
+ Path ptnPath = new Path(harProcessor.getParentFSPath(new Path(p.getSd().getLocation())));
+ if (fs.exists(ptnPath)){
+ fs.delete(ptnPath,true);
+ }
+ }
+ throw e;
+ }
}else{
// no harProcessor, regular operation
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java?rev=1152865&r1=1152864&r2=1152865&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java Mon Aug 1 17:17:04 2011
@@ -18,6 +18,17 @@
package org.apache.hcatalog.mapreduce;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -30,11 +41,17 @@ import org.apache.hadoop.hive.metastore.
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier;
import org.apache.hadoop.hive.thrift.DelegationTokenSelector;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
@@ -49,10 +66,6 @@ import org.apache.hcatalog.data.HCatReco
import org.apache.hcatalog.data.schema.HCatSchema;
import org.apache.thrift.TException;
-import java.io.IOException;
-import java.util.*;
-import java.util.Map.Entry;
-
/** The OutputFormat to use to write data to HCat. The key value is ignored and
* and should be given as null. The value is the HCatRecord to write.*/
public class HCatOutputFormat extends HCatBaseOutputFormat {
@@ -81,41 +94,41 @@ public class HCatOutputFormat extends HC
* Set the info about the output to write for the Job. This queries the metadata server
* to find the StorageDriver to use for the table. Throws error if partition is already published.
* @param job the job object
- * @param outputJobInfo the table output info
+ * @param outputInfo the table output info
* @throws IOException the exception in communicating with the metadata server
*/
@SuppressWarnings("unchecked")
- public static void setOutput(Job job, OutputJobInfo outputJobInfo) throws IOException {
+ public static void setOutput(Job job, HCatTableInfo outputInfo) throws IOException {
HiveMetaStoreClient client = null;
try {
Configuration conf = job.getConfiguration();
- client = createHiveClient(outputJobInfo.getServerUri(), conf);
- Table table = client.getTable(outputJobInfo.getDatabaseName(), outputJobInfo.getTableName());
+ client = createHiveClient(outputInfo.getServerUri(), conf);
+ Table table = client.getTable(outputInfo.getDatabaseName(), outputInfo.getTableName());
if (table.getPartitionKeysSize() == 0 ){
- if ((outputJobInfo.getPartitionValues() != null) && (!outputJobInfo.getPartitionValues().isEmpty())){
+ if ((outputInfo.getPartitionValues() != null) && (!outputInfo.getPartitionValues().isEmpty())){
// attempt made to save partition values in non-partitioned table - throw error.
throw new HCatException(ErrorType.ERROR_INVALID_PARTITION_VALUES,
"Partition values specified for non-partitioned table");
}
// non-partitioned table
- outputJobInfo.setPartitionValues(new HashMap<String, String>());
+ outputInfo.setPartitionValues(new HashMap<String, String>());
} else {
// partitioned table, we expect partition values
// convert user specified map to have lower case key names
Map<String, String> valueMap = new HashMap<String, String>();
- if (outputJobInfo.getPartitionValues() != null){
- for(Map.Entry<String, String> entry : outputJobInfo.getPartitionValues().entrySet()) {
+ if (outputInfo.getPartitionValues() != null){
+ for(Map.Entry<String, String> entry : outputInfo.getPartitionValues().entrySet()) {
valueMap.put(entry.getKey().toLowerCase(), entry.getValue());
}
}
if (
- (outputJobInfo.getPartitionValues() == null)
- || (outputJobInfo.getPartitionValues().size() < table.getPartitionKeysSize())
+ (outputInfo.getPartitionValues() == null)
+ || (outputInfo.getPartitionValues().size() < table.getPartitionKeysSize())
){
// dynamic partition usecase - partition values were null, or not all were specified
// need to figure out which keys are not specified.
@@ -132,7 +145,7 @@ public class HCatOutputFormat extends HC
throw new HCatException(ErrorType.ERROR_INVALID_PARTITION_VALUES,"Invalid partition keys specified");
}
- outputJobInfo.setDynamicPartitioningKeys(dynamicPartitioningKeys);
+ outputInfo.setDynamicPartitioningKeys(dynamicPartitioningKeys);
String dynHash;
if ((dynHash = conf.get(HCatConstants.HCAT_DYNAMIC_PTN_JOBID)) == null){
dynHash = String.valueOf(Math.random());
@@ -144,11 +157,11 @@ public class HCatOutputFormat extends HC
}
- outputJobInfo.setPartitionValues(valueMap);
+ outputInfo.setPartitionValues(valueMap);
}
//Handle duplicate publish
- handleDuplicatePublish(job, outputJobInfo, client, table);
+ handleDuplicatePublish(job, outputInfo, client, table);
StorageDescriptor tblSD = table.getSd();
HCatSchema tableSchema = HCatUtil.extractSchemaFromStorageDescriptor(tblSD);
@@ -166,15 +179,14 @@ public class HCatOutputFormat extends HC
String tblLocation = tblSD.getLocation();
String location = driver.getOutputLocation(job,
tblLocation, partitionCols,
- outputJobInfo.getPartitionValues(),conf.get(HCatConstants.HCAT_DYNAMIC_PTN_JOBID));
+ outputInfo.getPartitionValues(),conf.get(HCatConstants.HCAT_DYNAMIC_PTN_JOBID));
//Serialize the output info into the configuration
- outputJobInfo.setTableInfo(HCatTableInfo.valueOf(table));
- outputJobInfo.setOutputSchema(tableSchema);
- outputJobInfo.setLocation(location);
- outputJobInfo.setHarRequested(harRequested);
- outputJobInfo.setMaximumDynamicPartitions(maxDynamicPartitions);
- conf.set(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(outputJobInfo));
+ OutputJobInfo jobInfo = new OutputJobInfo(outputInfo,
+ tableSchema, tableSchema, storerInfo, location, table);
+ jobInfo.setHarRequested(harRequested);
+ jobInfo.setMaximumDynamicPartitions(maxDynamicPartitions);
+ conf.set(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(jobInfo));
Path tblPath = new Path(tblLocation);
@@ -221,7 +233,7 @@ public class HCatOutputFormat extends HC
// will correctly pick the right tokens which the committer will use and
// cancel.
- String tokenSignature = getTokenSignature(outputJobInfo);
+ String tokenSignature = getTokenSignature(outputInfo);
if(tokenMap.get(tokenSignature) == null) {
// get delegation tokens from hcat server and store them into the "job"
// These will be used in the HCatOutputCommitter to publish partitions to
@@ -271,17 +283,17 @@ public class HCatOutputFormat extends HC
// a signature string to associate with a HCatTableInfo - essentially
// a concatenation of dbname, tablename and partition keyvalues.
- private static String getTokenSignature(OutputJobInfo outputJobInfo) {
+ private static String getTokenSignature(HCatTableInfo outputInfo) {
StringBuilder result = new StringBuilder("");
- String dbName = outputJobInfo.getDatabaseName();
+ String dbName = outputInfo.getDatabaseName();
if(dbName != null) {
result.append(dbName);
}
- String tableName = outputJobInfo.getTableName();
+ String tableName = outputInfo.getTableName();
if(tableName != null) {
result.append("+" + tableName);
}
- Map<String, String> partValues = outputJobInfo.getPartitionValues();
+ Map<String, String> partValues = outputInfo.getPartitionValues();
if(partValues != null) {
for(Entry<String, String> entry: partValues.entrySet()) {
result.append("+" + entry.getKey() + "=" + entry.getValue());
@@ -302,7 +314,7 @@ public class HCatOutputFormat extends HC
* @throws MetaException
* @throws TException
*/
- private static void handleDuplicatePublish(Job job, OutputJobInfo outputInfo,
+ private static void handleDuplicatePublish(Job job, HCatTableInfo outputInfo,
HiveMetaStoreClient client, Table table) throws IOException, MetaException, TException {
/*
@@ -354,7 +366,7 @@ public class HCatOutputFormat extends HC
public static void setSchema(final Job job, final HCatSchema schema) throws IOException {
OutputJobInfo jobInfo = getJobInfo(job);
- Map<String,String> partMap = jobInfo.getPartitionValues();
+ Map<String,String> partMap = jobInfo.getTableInfo().getPartitionValues();
setPartDetails(jobInfo, schema, partMap);
job.getConfiguration().set(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(jobInfo));
}
@@ -461,7 +473,7 @@ public class HCatOutputFormat extends HC
OutputJobInfo info = HCatBaseOutputFormat.getJobInfo(context);
// Path workFile = osd.getWorkFilePath(context,info.getLocation());
Path workFile = osd.getWorkFilePath(context,context.getConfiguration().get("mapred.output.dir"));
- Path tblPath = new Path(info.getTableInfo().getTable().getSd().getLocation());
+ Path tblPath = new Path(info.getTable().getSd().getLocation());
FileSystem fs = tblPath.getFileSystem(context.getConfiguration());
FileStatus tblPathStat = fs.getFileStatus(tblPath);
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordWriter.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordWriter.java?rev=1152865&r1=1152864&r2=1152865&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordWriter.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordWriter.java Mon Aug 1 17:17:04 2011
@@ -62,7 +62,7 @@ public class HCatRecordWriter extends Re
// If partition columns occur in data, we want to remove them.
partColsToDel = jobInfo.getPosOfPartCols();
- dynamicPartitioningUsed = jobInfo.isDynamicPartitioningUsed();
+ dynamicPartitioningUsed = jobInfo.getTableInfo().isDynamicPartitioningUsed();
dynamicPartCols = jobInfo.getPosOfDynPartCols();
maxDynamicPartitions = jobInfo.getMaxDynamicPartitions();
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatTableInfo.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatTableInfo.java?rev=1152865&r1=1152864&r2=1152865&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatTableInfo.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatTableInfo.java Mon Aug 1 17:17:04 2011
@@ -18,13 +18,11 @@
package org.apache.hcatalog.mapreduce;
-import org.apache.hadoop.hive.metastore.MetaStoreUtils;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hcatalog.common.HCatUtil;
-import org.apache.hcatalog.data.schema.HCatSchema;
-
-import java.io.IOException;
import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
/**
*
@@ -37,47 +35,128 @@ public class HCatTableInfo implements Se
private static final long serialVersionUID = 1L;
+ public enum TableInfoType {
+ INPUT_INFO,
+ OUTPUT_INFO
+ };
+
+ private final TableInfoType tableInfoType;
+
+ /** The Metadata server uri */
+ private final String serverUri;
+
+ /** If the hcat server is configured to work with hadoop security, this
+ * variable will hold the principal name of the server - this will be used
+ * in the authentication to the hcat server using kerberos
+ */
+ private final String serverKerberosPrincipal;
+
/** The db and table names */
private final String dbName;
private final String tableName;
- /** The table schema. */
- private final HCatSchema dataColumns;
- private final HCatSchema partitionColumns;
+ /** The partition filter */
+ private String filter;
+
+ /** The partition predicates to filter on, an arbitrary AND/OR filter, if used to input from*/
+ private final String partitionPredicates;
- /** The table being written to */
- private final Table table;
+ /** The information about the partitions matching the specified query */
+ private JobInfo jobInfo;
- /** The storer info */
- private StorerInfo storerInfo;
+ /** The partition values to publish to, if used for output*/
+ private Map<String, String> partitionValues;
+
+ /** List of keys for which values were not specified at write setup time, to be infered at write time */
+ private List<String> dynamicPartitioningKeys;
+
/**
* Initializes a new HCatTableInfo instance to be used with {@link HCatInputFormat}
* for reading data from a table.
+ * @param serverUri the Metadata server uri
+ * @param serverKerberosPrincipal If the hcat server is configured to
* work with hadoop security, the kerberos principal name of the server - else null
* The principal name should be of the form:
* <servicename>/_HOST@<realm> like "hcat/_HOST@myrealm.com"
* The special string _HOST will be replaced automatically with the correct host name
* @param dbName the db name
* @param tableName the table name
- * @param dataColumns schema of columns which contain data
- * @param partitionColumns schema of partition columns
- * @param storerInfo information about storage descriptor
- * @param table hive metastore table class
*/
- HCatTableInfo(
+ public static HCatTableInfo getInputTableInfo(String serverUri,
+ String serverKerberosPrincipal,
String dbName,
- String tableName,
- HCatSchema dataColumns,
- HCatSchema partitionColumns,
- StorerInfo storerInfo,
- Table table) {
+ String tableName) {
+ return new HCatTableInfo(serverUri, serverKerberosPrincipal, dbName, tableName, (String) null);
+ }
+
+ /**
+ * Initializes a new HCatTableInfo instance to be used with {@link HCatInputFormat}
+ * for reading data from a table.
+ * @param serverUri the Metadata server uri
+ * @param serverKerberosPrincipal If the hcat server is configured to
+ * work with hadoop security, the kerberos principal name of the server - else null
+ * The principal name should be of the form:
+ * <servicename>/_HOST@<realm> like "hcat/_HOST@myrealm.com"
+ * The special string _HOST will be replaced automatically with the correct host name
+ * @param dbName the db name
+ * @param tableName the table name
+ * @param filter the partition filter
+ */
+ public static HCatTableInfo getInputTableInfo(String serverUri, String serverKerberosPrincipal, String dbName,
+ String tableName, String filter) {
+ return new HCatTableInfo(serverUri, serverKerberosPrincipal, dbName, tableName, filter);
+ }
+
+ private HCatTableInfo(String serverUri, String serverKerberosPrincipal,
+ String dbName, String tableName, String filter) {
+ this.serverUri = serverUri;
+ this.serverKerberosPrincipal = serverKerberosPrincipal;
+ this.dbName = (dbName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME : dbName;
+ this.tableName = tableName;
+ this.partitionPredicates = null;
+ this.partitionValues = null;
+ this.tableInfoType = TableInfoType.INPUT_INFO;
+ this.filter = filter;
+ }
+ /**
+ * Initializes a new HCatTableInfo instance to be used with {@link HCatOutputFormat}
+ * for writing data from a table.
+ * @param serverUri the Metadata server uri
+ * @param serverKerberosPrincipal If the hcat server is configured to
+ * work with hadoop security, the kerberos principal name of the server - else null
+ * The principal name should be of the form:
+ * <servicename>/_HOST@<realm> like "hcat/_HOST@myrealm.com"
+ * The special string _HOST will be replaced automatically with the correct host name
+ * @param dbName the db name
+ * @param tableName the table name
+ * @param partitionValues The partition values to publish to, can be null or empty Map to
+ * indicate write to a unpartitioned table. For partitioned tables, this map should
+ * contain keys for all partition columns with corresponding values.
+ */
+ public static HCatTableInfo getOutputTableInfo(String serverUri,
+ String serverKerberosPrincipal, String dbName, String tableName, Map<String, String> partitionValues){
+ return new HCatTableInfo(serverUri, serverKerberosPrincipal, dbName,
+ tableName, partitionValues);
+ }
+
+ private HCatTableInfo(String serverUri, String serverKerberosPrincipal,
+ String dbName, String tableName, Map<String, String> partitionValues){
+ this.serverUri = serverUri;
+ this.serverKerberosPrincipal = serverKerberosPrincipal;
this.dbName = (dbName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME : dbName;
this.tableName = tableName;
- this.dataColumns = dataColumns;
- this.table = table;
- this.storerInfo = storerInfo;
- this.partitionColumns = partitionColumns;
+ this.partitionPredicates = null;
+ this.partitionValues = partitionValues;
+ this.tableInfoType = TableInfoType.OUTPUT_INFO;
+ }
+
+ /**
+ * Gets the value of serverUri
+ * @return the serverUri
+ */
+ public String getServerUri() {
+ return serverUri;
}
/**
@@ -97,80 +176,97 @@ public class HCatTableInfo implements Se
}
/**
- * @return return schema of data columns as defined in meta store
+ * Gets the value of partitionPredicates
+ * @return the partitionPredicates
*/
- public HCatSchema getDataColumns() {
- return dataColumns;
+ public String getPartitionPredicates() {
+ return partitionPredicates;
}
/**
- * @return schema of partition columns
+ * Gets the value of partitionValues
+ * @return the partitionValues
*/
- public HCatSchema getPartitionColumns() {
- return partitionColumns;
+ public Map<String, String> getPartitionValues() {
+ return partitionValues;
}
/**
- * @return the storerInfo
+ * Gets the value of job info
+ * @return the job info
*/
- public StorerInfo getStorerInfo() {
- return storerInfo;
+ public JobInfo getJobInfo() {
+ return jobInfo;
}
/**
- * minimize dependency on hive classes so this is package private
- * this should eventually no longer be used
- * @return hive metastore representation of table
+ * Sets the value of jobInfo
+ * @param jobInfo the jobInfo to set
*/
- Table getTable() {
- return table;
+ public void setJobInfo(JobInfo jobInfo) {
+ this.jobInfo = jobInfo;
+ }
+
+ public TableInfoType getTableType(){
+ return this.tableInfoType;
}
/**
- * create an HCatTableInfo instance from the supplied Hive Table instance
- * @param table to create an instance from
- * @return HCatTableInfo
- * @throws IOException
+ * Sets the value of partitionValues
+ * @param partitionValues the partition values to set
*/
- static HCatTableInfo valueOf(Table table) throws IOException {
- HCatSchema dataColumns = HCatUtil.extractSchemaFromStorageDescriptor(table.getSd());
- StorerInfo storerInfo = InitializeInput.extractStorerInfo(table.getSd(), table.getParameters());
- HCatSchema partitionColumns = HCatUtil.getPartitionColumns(table);
- return new HCatTableInfo(table.getDbName(),
- table.getTableName(),
- dataColumns,
- partitionColumns,
- storerInfo,
- table);
+ void setPartitionValues(Map<String, String> partitionValues) {
+ this.partitionValues = partitionValues;
}
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- HCatTableInfo tableInfo = (HCatTableInfo) o;
-
- if (dataColumns != null ? !dataColumns.equals(tableInfo.dataColumns) : tableInfo.dataColumns != null) return false;
- if (dbName != null ? !dbName.equals(tableInfo.dbName) : tableInfo.dbName != null) return false;
- if (partitionColumns != null ? !partitionColumns.equals(tableInfo.partitionColumns) : tableInfo.partitionColumns != null)
- return false;
- if (storerInfo != null ? !storerInfo.equals(tableInfo.storerInfo) : tableInfo.storerInfo != null) return false;
- if (table != null ? !table.equals(tableInfo.table) : tableInfo.table != null) return false;
- if (tableName != null ? !tableName.equals(tableInfo.tableName) : tableInfo.tableName != null) return false;
+ /**
+ * Gets the value of partition filter
+ * @return the filter string
+ */
+ public String getFilter() {
+ return filter;
+ }
- return true;
+ /**
+ * @return the serverKerberosPrincipal
+ */
+ public String getServerKerberosPrincipal() {
+ return serverKerberosPrincipal;
+ }
+
+ /**
+ * Returns whether or not Dynamic Partitioning is used
+ * @return whether or not dynamic partitioning is currently enabled and used
+ */
+ public boolean isDynamicPartitioningUsed() {
+ return !((dynamicPartitioningKeys == null) || (dynamicPartitioningKeys.isEmpty()));
+ }
+
+ /**
+ * Sets the list of dynamic partitioning keys used for outputting without specifying all the keys
+ * @param dynamicPartitioningKeys
+ */
+ public void setDynamicPartitioningKeys(List<String> dynamicPartitioningKeys) {
+ this.dynamicPartitioningKeys = dynamicPartitioningKeys;
+ }
+
+ public List<String> getDynamicPartitioningKeys(){
+ return this.dynamicPartitioningKeys;
}
@Override
public int hashCode() {
- int result = dbName != null ? dbName.hashCode() : 0;
- result = 31 * result + (tableName != null ? tableName.hashCode() : 0);
- result = 31 * result + (dataColumns != null ? dataColumns.hashCode() : 0);
- result = 31 * result + (partitionColumns != null ? partitionColumns.hashCode() : 0);
- result = 31 * result + (table != null ? table.hashCode() : 0);
- result = 31 * result + (storerInfo != null ? storerInfo.hashCode() : 0);
+ int result = 17;
+ result = 31*result + (serverUri == null ? 0 : serverUri.hashCode());
+ result = 31*result + (serverKerberosPrincipal == null ? 0 : serverKerberosPrincipal.hashCode());
+ result = 31*result + (dbName == null? 0 : dbName.hashCode());
+ result = 31*result + tableName.hashCode();
+ result = 31*result + (filter == null? 0 : filter.hashCode());
+ result = 31*result + (partitionPredicates == null ? 0 : partitionPredicates.hashCode());
+ result = 31*result + tableInfoType.ordinal();
+ result = 31*result + (partitionValues == null ? 0 : partitionValues.hashCode());
+ result = 31*result + (dynamicPartitioningKeys == null ? 0 : dynamicPartitioningKeys.hashCode());
return result;
}
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java?rev=1152865&r1=1152864&r2=1152865&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java Mon Aug 1 17:17:04 2011
@@ -17,6 +17,13 @@
*/
package org.apache.hcatalog.mapreduce;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
@@ -31,9 +38,6 @@ import org.apache.hcatalog.common.HCatEx
import org.apache.hcatalog.common.HCatUtil;
import org.apache.hcatalog.data.schema.HCatSchema;
-import java.io.IOException;
-import java.util.*;
-
/**
* The Class which handles querying the metadata server using the MetaStoreClient. The list of
* partitions matching the partition filter is fetched from the server and the information is
@@ -46,15 +50,15 @@ public class InitializeInput {
static final String HCAT_KEY_PREFIX = "hcat.";
private static final HiveConf hiveConf = new HiveConf(HCatInputFormat.class);
- private static HiveMetaStoreClient createHiveMetaClient(Configuration conf, InputJobInfo inputJobInfo) throws Exception {
- if (inputJobInfo.getServerUri() != null){
+ private static HiveMetaStoreClient createHiveMetaClient(Configuration conf, HCatTableInfo inputInfo) throws Exception {
+ if (inputInfo.getServerUri() != null){
hiveConf.setBoolean(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname, true);
hiveConf.set(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname,
- inputJobInfo.getServerKerberosPrincipal());
+ inputInfo.getServerKerberosPrincipal());
hiveConf.set("hive.metastore.local", "false");
- hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, inputJobInfo.getServerUri());
+ hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, inputInfo.getServerUri());
}
return new HiveMetaStoreClient(hiveConf,null);
@@ -64,29 +68,28 @@ public class InitializeInput {
* Set the input to use for the Job. This queries the metadata server with the specified partition predicates,
* gets the matching partitions, puts the information in the configuration object.
* @param job the job object
- * @param inputJobInfo information on the Input to read
+ * @param inputInfo the hcat table input info
* @throws Exception
*/
- public static void setInput(Job job, InputJobInfo inputJobInfo) throws Exception {
+ public static void setInput(Job job, HCatTableInfo inputInfo) throws Exception {
- //* Create and initialize an InputJobInfo object
- //* Serialize the InputJobInfo and save in the Job's Configuration object
+ //* Create and initialize an JobInfo object
+ //* Serialize the JobInfo and save in the Job's Configuration object
HiveMetaStoreClient client = null;
try {
- client = createHiveMetaClient(job.getConfiguration(),inputJobInfo);
- Table table = client.getTable(inputJobInfo.getDatabaseName(),
- inputJobInfo.getTableName());
+ client = createHiveMetaClient(job.getConfiguration(),inputInfo);
+ Table table = client.getTable(inputInfo.getDatabaseName(), inputInfo.getTableName());
+ HCatSchema tableSchema = HCatUtil.getTableSchemaWithPtnCols(table);
List<PartInfo> partInfoList = new ArrayList<PartInfo>();
if( table.getPartitionKeys().size() != 0 ) {
//Partitioned table
- List<Partition> parts = client.listPartitionsByFilter(inputJobInfo.getDatabaseName(),
- inputJobInfo.getTableName(),
- inputJobInfo.getFilter(),
- (short) -1);
+ List<Partition> parts = client.listPartitionsByFilter(
+ inputInfo.getDatabaseName(), inputInfo.getTableName(),
+ inputInfo.getFilter(), (short) -1);
// Default to 100,000 partitions if hive.metastore.maxpartition is not defined
int maxPart = hiveConf.getInt("hcat.metastore.maxpartitions", 100000);
@@ -107,12 +110,13 @@ public class InitializeInput {
partInfo.setPartitionValues(new HashMap<String,String>());
partInfoList.add(partInfo);
}
- inputJobInfo.setPartitions(partInfoList);
- inputJobInfo.setTableInfo(HCatTableInfo.valueOf(table));
+
+ JobInfo hcatJobInfo = new JobInfo(inputInfo, tableSchema, partInfoList);
+ inputInfo.setJobInfo(hcatJobInfo);
job.getConfiguration().set(
HCatConstants.HCAT_KEY_JOB_INFO,
- HCatUtil.serialize(inputJobInfo)
+ HCatUtil.serialize(hcatJobInfo)
);
} finally {
if (client != null ) {
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/JobInfo.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/JobInfo.java?rev=1152865&r1=1152864&r2=1152865&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/JobInfo.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/JobInfo.java Mon Aug 1 17:17:04 2011
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.mapreduce;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.hcatalog.data.schema.HCatSchema;
+
+/** The class used to serialize and store the information read from the metadata server */
+public class JobInfo implements Serializable{
+
+ /** The serialization version */
+ private static final long serialVersionUID = 1L;
+
+ /** The db and table names. */
+ private final String dbName;
+ private final String tableName;
+
+ /** The table schema. */
+ private final HCatSchema tableSchema;
+
+ /** The list of partitions matching the filter. */
+ private final List<PartInfo> partitions;
+
+ /**
+ * Instantiates a new hcat job info.
+ * @param hcatTableInfo
+ * @param tableSchema the table schema
+ * @param partitions the partitions
+ */
+ public JobInfo(HCatTableInfo hcatTableInfo, HCatSchema tableSchema,
+ List<PartInfo> partitions) {
+ this.tableName = hcatTableInfo.getTableName();
+ this.dbName = hcatTableInfo.getDatabaseName();
+ this.tableSchema = tableSchema;
+ this.partitions = partitions;
+ }
+
+ /**
+ * Gets the value of dbName
+ * @return the dbName
+ */
+ public String getDatabaseName() {
+ return tableName;
+ }
+
+ /**
+ * Gets the value of tableName
+ * @return the tableName
+ */
+ public String getTableName() {
+ return tableName;
+ }
+
+ /**
+ * Gets the value of tableSchema
+ * @return the tableSchema
+ */
+ public HCatSchema getTableSchema() {
+ return tableSchema;
+ }
+
+ /**
+ * Gets the value of partitions
+ * @return the partitions
+ */
+ public List<PartInfo> getPartitions() {
+ return partitions;
+ }
+
+}
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/OutputJobInfo.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/OutputJobInfo.java?rev=1152865&r1=1152864&r2=1152865&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/OutputJobInfo.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/OutputJobInfo.java Mon Aug 1 17:17:04 2011
@@ -18,126 +18,79 @@
package org.apache.hcatalog.mapreduce;
-import org.apache.hadoop.hive.metastore.MetaStoreUtils;
-import org.apache.hcatalog.data.schema.HCatSchema;
-
import java.io.Serializable;
-import java.util.*;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hcatalog.data.schema.HCatSchema;
/** The class used to serialize and store the output related information */
-public class OutputJobInfo implements Serializable {
+class OutputJobInfo implements Serializable {
+
+ /** The serialization version. */
+ private static final long serialVersionUID = 1L;
+
+ /** The table info provided by user. */
+ private final HCatTableInfo tableInfo;
+
+ /** The output schema. This is given to us by user. This wont contain any
+ * partition columns ,even if user has specified them.
+ * */
+ private HCatSchema outputSchema;
+
+ /** This is table schema, retrieved from metastore. */
+ private final HCatSchema tableSchema;
+
+ /** The storer info */
+ private final StorerInfo storerInfo;
+
+ /** The location of the partition being written */
+ private final String location;
+
+ /** The table being written to */
+ private final Table table;
+
+ /** This is a list of partition columns which will be deleted from data, if
+ * data contains partition columns.*/
+
+ private List<Integer> posOfPartCols;
+ private List<Integer> posOfDynPartCols;
+
+ private int maxDynamicPartitions;
- /** The db and table names. */
- private final String databaseName;
- private final String tableName;
-
- /** The serialization version. */
- private static final long serialVersionUID = 1L;
-
- /** The table info provided by user. */
- private HCatTableInfo tableInfo;
-
- /** The output schema. This is given to us by user. This wont contain any
- * partition columns ,even if user has specified them.
- * */
- private HCatSchema outputSchema;
-
- /** The location of the partition being written */
- private String location;
-
- /** The partition values to publish to, if used for output*/
- private Map<String, String> partitionValues;
-
- /** The Metadata server uri */
- private final String serverUri;
-
- /** If the hcat server is configured to work with hadoop security, this
- * variable will hold the principal name of the server - this will be used
- * in the authentication to the hcat server using kerberos
- */
- private final String serverKerberosPrincipal;
-
- private List<Integer> posOfPartCols;
- private List<Integer> posOfDynPartCols;
-
- private Map<String,String> properties;
-
- private int maxDynamicPartitions;
-
- /** List of keys for which values were not specified at write setup time, to be infered at write time */
- private List<String> dynamicPartitioningKeys;
-
- private boolean harRequested;
-
- /**
- * Initializes a new OutputJobInfo instance
- * for writing data from a table.
- * @param databaseName the db name
- * @param tableName the table name
- * @param partitionValues The partition values to publish to, can be null or empty Map to
- * @param serverUri the Metadata server uri
- * @param serverKerberosPrincipal If the hcat server is configured to
- * work with hadoop security, the kerberos principal name of the server - else null
- * The principal name should be of the form:
- * <servicename>/_HOST@<realm> like "hcat/_HOST@myrealm.com"
- * The special string _HOST will be replaced automatically with the correct host name
- * indicate write to a unpartitioned table. For partitioned tables, this map should
- * contain keys for all partition columns with corresponding values.
- */
- public static OutputJobInfo create(String databaseName,
- String tableName,
- Map<String, String> partitionValues,
- String serverUri,
- String serverKerberosPrincipal) {
- return new OutputJobInfo(databaseName,
- tableName,
- partitionValues,
- serverUri,
- serverKerberosPrincipal);
- }
-
- private OutputJobInfo(String databaseName,
- String tableName,
- Map<String, String> partitionValues,
- String serverUri,
- String serverKerberosPrincipal) {
- this.databaseName = (databaseName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME : databaseName;
- this.tableName = tableName;
- this.serverUri = serverUri;
- this.serverKerberosPrincipal = serverKerberosPrincipal;
- this.partitionValues = partitionValues;
- this.properties = new HashMap<String,String>();
- }
-
- /**
- * @return the posOfPartCols
- */
- protected List<Integer> getPosOfPartCols() {
- return posOfPartCols;
- }
-
- /**
- * @return the posOfDynPartCols
- */
- protected List<Integer> getPosOfDynPartCols() {
- return posOfDynPartCols;
- }
-
- /**
- * @param posOfPartCols the posOfPartCols to set
- */
- protected void setPosOfPartCols(List<Integer> posOfPartCols) {
- // sorting the list in the descending order so that deletes happen back-to-front
- Collections.sort(posOfPartCols, new Comparator<Integer> () {
- @Override
- public int compare(Integer earlier, Integer later) {
- return (earlier > later) ? -1 : ((earlier == later) ? 0 : 1);
- }
- });
- this.posOfPartCols = posOfPartCols;
- }
+ private boolean harRequested;
- /**
+ /**
+ * @return the posOfPartCols
+ */
+ protected List<Integer> getPosOfPartCols() {
+ return posOfPartCols;
+ }
+
+ /**
+ * @return the posOfDynPartCols
+ */
+ protected List<Integer> getPosOfDynPartCols() {
+ return posOfDynPartCols;
+ }
+
+ /**
+ * @param posOfPartCols the posOfPartCols to set
+ */
+ protected void setPosOfPartCols(List<Integer> posOfPartCols) {
+ // sorting the list in the descending order so that deletes happen back-to-front
+ Collections.sort(posOfPartCols, new Comparator<Integer> () {
+ @Override
+ public int compare(Integer earlier, Integer later) {
+ return (earlier > later) ? -1 : ((earlier == later) ? 0 : 1);
+ }
+ });
+ this.posOfPartCols = posOfPartCols;
+ }
+
+ /**
* @param posOfDynPartCols the posOfDynPartCols to set
*/
protected void setPosOfDynPartCols(List<Integer> posOfDynPartCols) {
@@ -145,153 +98,97 @@ public class OutputJobInfo implements Se
this.posOfDynPartCols = posOfDynPartCols;
}
- /**
- * @return the tableInfo
- */
- public HCatTableInfo getTableInfo() {
- return tableInfo;
- }
-
- /**
- * @return the outputSchema
- */
- public HCatSchema getOutputSchema() {
- return outputSchema;
- }
-
- /**
- * @param schema the outputSchema to set
- */
- public void setOutputSchema(HCatSchema schema) {
- this.outputSchema = schema;
- }
-
- /**
- * @return the location
- */
- public String getLocation() {
- return location;
- }
-
- /**
- * @param location location to write to
- */
- void setLocation(String location) {
- this.location = location;
- }
- /**
- * Sets the value of partitionValues
- * @param partitionValues the partition values to set
- */
- void setPartitionValues(Map<String, String> partitionValues) {
- this.partitionValues = partitionValues;
- }
-
- /**
- * Gets the value of partitionValues
- * @return the partitionValues
- */
- public Map<String, String> getPartitionValues() {
- return partitionValues;
- }
-
- /**
- * @return metastore thrift server URI
- */
- public String getServerUri() {
- return serverUri;
- }
-
- /**
- * @return the serverKerberosPrincipal
- */
- public String getServerKerberosPrincipal() {
- return serverKerberosPrincipal;
- }
-
- /**
- * set the tablInfo instance
- * this should be the same instance
- * determined by this object's DatabaseName and TableName
- * @param tableInfo
- */
- void setTableInfo(HCatTableInfo tableInfo) {
- this.tableInfo = tableInfo;
- }
-
- /**
- * @return database name of table to write to
- */
- public String getDatabaseName() {
- return databaseName;
- }
-
- /**
- * @return name of table to write to
- */
- public String getTableName() {
- return tableName;
- }
-
- /**
- * Set/Get Property information to be passed down to *StorageDriver implementation
- * put implementation specific storage driver configurations here
- * @return
- */
- public Map<String,String> getProperties() {
- return properties;
- }
-
- /**
- * Set maximum number of allowable dynamic partitions
- * @param maxDynamicPartitions
- */
- public void setMaximumDynamicPartitions(int maxDynamicPartitions){
- this.maxDynamicPartitions = maxDynamicPartitions;
- }
-
- /**
- * Returns maximum number of allowable dynamic partitions
- * @return maximum number of allowable dynamic partitions
- */
- public int getMaxDynamicPartitions() {
- return this.maxDynamicPartitions;
- }
-
- /**
- * Sets whether or not hadoop archiving has been requested for this job
- * @param harRequested
- */
- public void setHarRequested(boolean harRequested){
- this.harRequested = harRequested;
- }
-
- /**
- * Returns whether or not hadoop archiving has been requested for this job
- * @return whether or not hadoop archiving has been requested for this job
- */
- public boolean getHarRequested() {
- return this.harRequested;
- }
-
- /**
- * Returns whether or not Dynamic Partitioning is used
- * @return whether or not dynamic partitioning is currently enabled and used
- */
- public boolean isDynamicPartitioningUsed() {
- return !((dynamicPartitioningKeys == null) || (dynamicPartitioningKeys.isEmpty()));
- }
-
- /**
- * Sets the list of dynamic partitioning keys used for outputting without specifying all the keys
- * @param dynamicPartitioningKeys
- */
- public void setDynamicPartitioningKeys(List<String> dynamicPartitioningKeys) {
- this.dynamicPartitioningKeys = dynamicPartitioningKeys;
- }
-
- public List<String> getDynamicPartitioningKeys(){
- return this.dynamicPartitioningKeys;
- }
+ public OutputJobInfo(HCatTableInfo tableInfo, HCatSchema outputSchema, HCatSchema tableSchema,
+ StorerInfo storerInfo, String location, Table table) {
+ super();
+ this.tableInfo = tableInfo;
+ this.outputSchema = outputSchema;
+ this.tableSchema = tableSchema;
+ this.storerInfo = storerInfo;
+ this.location = location;
+ this.table = table;
+ }
+
+ /**
+ * @return the tableInfo
+ */
+ public HCatTableInfo getTableInfo() {
+ return tableInfo;
+ }
+
+ /**
+ * @return the outputSchema
+ */
+ public HCatSchema getOutputSchema() {
+ return outputSchema;
+ }
+
+ /**
+ * @param schema the outputSchema to set
+ */
+ public void setOutputSchema(HCatSchema schema) {
+ this.outputSchema = schema;
+ }
+
+ /**
+ * @return the tableSchema
+ */
+ public HCatSchema getTableSchema() {
+ return tableSchema;
+ }
+
+ /**
+ * @return the storerInfo
+ */
+ public StorerInfo getStorerInfo() {
+ return storerInfo;
+ }
+
+ /**
+ * @return the location
+ */
+ public String getLocation() {
+ return location;
+ }
+
+ /**
+ * Gets the value of table
+ * @return the table
+ */
+ public Table getTable() {
+ return table;
+ }
+
+ /**
+ * Set maximum number of allowable dynamic partitions
+ * @param maxDynamicPartitions
+ */
+ public void setMaximumDynamicPartitions(int maxDynamicPartitions){
+ this.maxDynamicPartitions = maxDynamicPartitions;
+ }
+
+ /**
+ * Returns maximum number of allowable dynamic partitions
+ * @return maximum number of allowable dynamic partitions
+ */
+ public int getMaxDynamicPartitions() {
+ return this.maxDynamicPartitions;
+ }
+
+ /**
+ * Sets whether or not hadoop archiving has been requested for this job
+ * @param harRequested
+ */
+ public void setHarRequested(boolean harRequested){
+ this.harRequested = harRequested;
+ }
+
+ /**
+ * Returns whether or not hadoop archiving has been requested for this job
+ * @return whether or not hadoop archiving has been requested for this job
+ */
+ public boolean getHarRequested() {
+ return this.harRequested;
+ }
}
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatLoader.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatLoader.java?rev=1152865&r1=1152864&r2=1152865&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatLoader.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatLoader.java Mon Aug 1 17:17:04 2011
@@ -17,6 +17,10 @@
*/
package org.apache.hcatalog.pig;
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Table;
@@ -27,7 +31,7 @@ import org.apache.hcatalog.common.HCatUt
import org.apache.hcatalog.data.Pair;
import org.apache.hcatalog.data.schema.HCatSchema;
import org.apache.hcatalog.mapreduce.HCatInputFormat;
-import org.apache.hcatalog.mapreduce.InputJobInfo;
+import org.apache.hcatalog.mapreduce.HCatTableInfo;
import org.apache.pig.Expression;
import org.apache.pig.Expression.BinaryExpression;
import org.apache.pig.LoadFunc;
@@ -35,10 +39,6 @@ import org.apache.pig.PigException;
import org.apache.pig.ResourceSchema;
import org.apache.pig.impl.util.UDFContext;
-import java.io.IOException;
-import java.util.List;
-import java.util.Properties;
-
/**
* Pig {@link LoadFunc} to read data from HCat
*/
@@ -82,12 +82,14 @@ public class HCatLoader extends HCatBase
// in the hadoop front end mapred.task.id property will not be set in
// the Configuration
if (!HCatUtil.checkJobContextIfRunningFromBackend(job)){
- HCatInputFormat.setInput(job,
- InputJobInfo.create(dbName,
- tableName,
- getPartitionFilterString(),
- hcatServerUri != null ? hcatServerUri : (hcatServerUri = PigHCatUtil.getHCatServerUri(job)),
- PigHCatUtil.getHCatServerPrincipal(job)));
+
+ HCatInputFormat.setInput(job, HCatTableInfo.getInputTableInfo(
+ hcatServerUri!=null ? hcatServerUri :
+ (hcatServerUri = PigHCatUtil.getHCatServerUri(job)),
+ PigHCatUtil.getHCatServerPrincipal(job),
+ dbName,
+ tableName,
+ getPartitionFilterString()));
}
// Need to also push projections by calling setOutputSchema on
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java?rev=1152865&r1=1152864&r2=1152865&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java Mon Aug 1 17:17:04 2011
@@ -18,6 +18,9 @@
package org.apache.hcatalog.pig;
+import java.io.IOException;
+import java.util.Properties;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
@@ -27,7 +30,7 @@ import org.apache.hcatalog.common.HCatUt
import org.apache.hcatalog.data.schema.HCatSchema;
import org.apache.hcatalog.mapreduce.HCatOutputCommitter;
import org.apache.hcatalog.mapreduce.HCatOutputFormat;
-import org.apache.hcatalog.mapreduce.OutputJobInfo;
+import org.apache.hcatalog.mapreduce.HCatTableInfo;
import org.apache.pig.PigException;
import org.apache.pig.ResourceSchema;
import org.apache.pig.impl.logicalLayer.FrontendException;
@@ -35,9 +38,6 @@ import org.apache.pig.impl.logicalLayer.
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.UDFContext;
-import java.io.IOException;
-import java.util.Properties;
-
/**
* HCatStorer.
*
@@ -72,20 +72,13 @@ public class HCatStorer extends HCatBase
Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[]{sign});
String[] userStr = location.split("\\.");
- OutputJobInfo outputJobInfo;
-
+ HCatTableInfo tblInfo;
if(userStr.length == 2) {
- outputJobInfo = OutputJobInfo.create(userStr[0],
- userStr[1],
- partitions,
- PigHCatUtil.getHCatServerUri(job),
- PigHCatUtil.getHCatServerPrincipal(job));
+ tblInfo = HCatTableInfo.getOutputTableInfo(PigHCatUtil.getHCatServerUri(job),
+ PigHCatUtil.getHCatServerPrincipal(job), userStr[0],userStr[1],partitions);
} else {
- outputJobInfo = OutputJobInfo.create(null,
- userStr[0],
- partitions,
- PigHCatUtil.getHCatServerUri(job),
- PigHCatUtil.getHCatServerPrincipal(job));
+ tblInfo = HCatTableInfo.getOutputTableInfo(PigHCatUtil.getHCatServerUri(job),
+ PigHCatUtil.getHCatServerPrincipal(job), null,userStr[0],partitions);
}
@@ -101,7 +94,7 @@ public class HCatStorer extends HCatBase
throw new FrontendException("Schema for data cannot be determined.", PigHCatUtil.PIG_EXCEPTION_CODE);
}
try{
- HCatOutputFormat.setOutput(job, outputJobInfo);
+ HCatOutputFormat.setOutput(job, tblInfo);
} catch(HCatException he) {
// pass the message to the user - essentially something about the table
// information passed to HCatOutputFormat was not right