You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by kh...@apache.org on 2011/07/22 23:38:14 UTC
svn commit: r1149763 [2/2] - in /incubator/hcatalog/trunk: ./
src/java/org/apache/hcatalog/common/ src/java/org/apache/hcatalog/har/
src/java/org/apache/hcatalog/mapreduce/ src/java/org/apache/hcatalog/pig/
src/java/org/apache/hcatalog/rcfile/ src/test...
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatTableInfo.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatTableInfo.java?rev=1149763&r1=1149762&r2=1149763&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatTableInfo.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatTableInfo.java Fri Jul 22 23:38:07 2011
@@ -19,6 +19,7 @@
package org.apache.hcatalog.mapreduce;
import java.io.Serializable;
+import java.util.List;
import java.util.Map;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
@@ -66,6 +67,10 @@ public class HCatTableInfo implements Se
/** The partition values to publish to, if used for output*/
private Map<String, String> partitionValues;
+ /** List of keys for which values were not specified at write setup time, to be infered at write time */
+ private List<String> dynamicPartitioningKeys;
+
+
/**
* Initializes a new HCatTableInfo instance to be used with {@link HCatInputFormat}
* for reading data from a table.
@@ -229,6 +234,27 @@ public class HCatTableInfo implements Se
return serverKerberosPrincipal;
}
+ /**
+ * Returns whether or not Dynamic Partitioning is used
+ * @return whether or not dynamic partitioning is currently enabled and used
+ */
+ public boolean isDynamicPartitioningUsed() {
+ return !((dynamicPartitioningKeys == null) || (dynamicPartitioningKeys.isEmpty()));
+ }
+
+ /**
+ * Sets the list of dynamic partitioning keys used for outputting without specifying all the keys
+ * @param dynamicPartitioningKeys
+ */
+ public void setDynamicPartitioningKeys(List<String> dynamicPartitioningKeys) {
+ this.dynamicPartitioningKeys = dynamicPartitioningKeys;
+ }
+
+ public List<String> getDynamicPartitioningKeys(){
+ return this.dynamicPartitioningKeys;
+ }
+
+
@Override
public int hashCode() {
int result = 17;
@@ -240,8 +266,9 @@ public class HCatTableInfo implements Se
result = 31*result + (partitionPredicates == null ? 0 : partitionPredicates.hashCode());
result = 31*result + tableInfoType.ordinal();
result = 31*result + (partitionValues == null ? 0 : partitionValues.hashCode());
+ result = 31*result + (dynamicPartitioningKeys == null ? 0 : dynamicPartitioningKeys.hashCode());
return result;
-
}
+
}
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/OutputJobInfo.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/OutputJobInfo.java?rev=1149763&r1=1149762&r2=1149763&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/OutputJobInfo.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/OutputJobInfo.java Fri Jul 22 23:38:07 2011
@@ -56,6 +56,11 @@ class OutputJobInfo implements Serializa
* data contains partition columns.*/
private List<Integer> posOfPartCols;
+ private List<Integer> posOfDynPartCols;
+
+ private int maxDynamicPartitions;
+
+ private boolean harRequested;
/**
* @return the posOfPartCols
@@ -65,6 +70,13 @@ class OutputJobInfo implements Serializa
}
/**
+ * @return the posOfDynPartCols
+ */
+ protected List<Integer> getPosOfDynPartCols() {
+ return posOfDynPartCols;
+ }
+
+ /**
* @param posOfPartCols the posOfPartCols to set
*/
protected void setPosOfPartCols(List<Integer> posOfPartCols) {
@@ -78,6 +90,14 @@ class OutputJobInfo implements Serializa
this.posOfPartCols = posOfPartCols;
}
+ /**
+ * @param posOfDynPartCols the posOfDynPartCols to set
+ */
+ protected void setPosOfDynPartCols(List<Integer> posOfDynPartCols) {
+ // Important - no sorting here! We retain order, it's used to match with values at runtime
+ this.posOfDynPartCols = posOfDynPartCols;
+ }
+
public OutputJobInfo(HCatTableInfo tableInfo, HCatSchema outputSchema, HCatSchema tableSchema,
StorerInfo storerInfo, String location, Table table) {
super();
@@ -139,4 +159,36 @@ class OutputJobInfo implements Serializa
return table;
}
+ /**
+ * Set maximum number of allowable dynamic partitions
+ * @param maxDynamicPartitions
+ */
+ public void setMaximumDynamicPartitions(int maxDynamicPartitions){
+ this.maxDynamicPartitions = maxDynamicPartitions;
+ }
+
+ /**
+ * Returns maximum number of allowable dynamic partitions
+ * @return maximum number of allowable dynamic partitions
+ */
+ public int getMaxDynamicPartitions() {
+ return this.maxDynamicPartitions;
+ }
+
+ /**
+ * Sets whether or not hadoop archiving has been requested for this job
+ * @param harRequested
+ */
+ public void setHarRequested(boolean harRequested){
+ this.harRequested = harRequested;
+ }
+
+ /**
+ * Returns whether or not hadoop archiving has been requested for this job
+ * @return whether or not hadoop archiving has been requested for this job
+ */
+ public boolean getHarRequested() {
+ return this.harRequested;
+ }
+
}
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatEximStorer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatEximStorer.java?rev=1149763&r1=1149762&r2=1149763&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatEximStorer.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatEximStorer.java Fri Jul 22 23:38:07 2011
@@ -145,7 +145,7 @@ public class HCatEximStorer extends HCat
//In local mode, mapreduce will not call HCatOutputCommitter.cleanupJob.
//Calling it from here so that the partition publish happens.
//This call needs to be removed after MAPREDUCE-1447 is fixed.
- new HCatEximOutputCommitter(null).cleanupJob(job);
+ new HCatEximOutputCommitter(job,null).cleanupJob(job);
}
}
}
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java?rev=1149763&r1=1149762&r2=1149763&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java Fri Jul 22 23:38:07 2011
@@ -109,34 +109,35 @@ public class HCatStorer extends HCatBase
computedSchema = convertPigSchemaToHCatSchema(pigSchema,hcatTblSchema);
HCatOutputFormat.setSchema(job, computedSchema);
p.setProperty(HCatConstants.HCAT_KEY_OUTPUT_INFO, config.get(HCatConstants.HCAT_KEY_OUTPUT_INFO));
- if(config.get(HCatConstants.HCAT_KEY_HIVE_CONF) != null){
- p.setProperty(HCatConstants.HCAT_KEY_HIVE_CONF, config.get(HCatConstants.HCAT_KEY_HIVE_CONF));
- }
- if(config.get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null){
- p.setProperty(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE,
- config.get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE));
- }
+
+ PigHCatUtil.saveConfigIntoUDFProperties(p, config,HCatConstants.HCAT_KEY_HIVE_CONF);
+ PigHCatUtil.saveConfigIntoUDFProperties(p, config,HCatConstants.HCAT_DYNAMIC_PTN_JOBID);
+ PigHCatUtil.saveConfigIntoUDFProperties(p, config,HCatConstants.HCAT_KEY_TOKEN_SIGNATURE);
+ PigHCatUtil.saveConfigIntoUDFProperties(p, config,HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_SIGNATURE);
+ PigHCatUtil.saveConfigIntoUDFProperties(p, config,HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_STRFORM);
+
p.setProperty(COMPUTED_OUTPUT_SCHEMA,ObjectSerializer.serialize(computedSchema));
}else{
config.set(HCatConstants.HCAT_KEY_OUTPUT_INFO, p.getProperty(HCatConstants.HCAT_KEY_OUTPUT_INFO));
- if(p.getProperty(HCatConstants.HCAT_KEY_HIVE_CONF) != null){
- config.set(HCatConstants.HCAT_KEY_HIVE_CONF, p.getProperty(HCatConstants.HCAT_KEY_HIVE_CONF));
- }
- if(p.getProperty(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null){
- config.set(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE,
- p.getProperty(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE));
- }
+
+ PigHCatUtil.getConfigFromUDFProperties(p, config, HCatConstants.HCAT_KEY_HIVE_CONF);
+ PigHCatUtil.getConfigFromUDFProperties(p, config, HCatConstants.HCAT_DYNAMIC_PTN_JOBID);
+ PigHCatUtil.getConfigFromUDFProperties(p, config, HCatConstants.HCAT_KEY_TOKEN_SIGNATURE);
+ PigHCatUtil.getConfigFromUDFProperties(p, config, HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_SIGNATURE);
+ PigHCatUtil.getConfigFromUDFProperties(p, config, HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_STRFORM);
+
}
}
+
@Override
public void storeSchema(ResourceSchema schema, String arg1, Job job) throws IOException {
if( job.getConfiguration().get("mapred.job.tracker", "").equalsIgnoreCase("local") ) {
//In local mode, mapreduce will not call HCatOutputCommitter.cleanupJob.
//Calling it from here so that the partition publish happens.
//This call needs to be removed after MAPREDUCE-1447 is fixed.
- new HCatOutputCommitter(null).cleanupJob(job);
+ new HCatOutputCommitter(job,null).cleanupJob(job);
}
}
}
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/PigHCatUtil.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/PigHCatUtil.java?rev=1149763&r1=1149762&r2=1149763&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/PigHCatUtil.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/PigHCatUtil.java Fri Jul 22 23:38:07 2011
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
@@ -405,4 +406,16 @@ public static Object extractPigObject(Ob
}
+ public static void getConfigFromUDFProperties(Properties p, Configuration config, String propName) {
+ if(p.getProperty(propName) != null){
+ config.set(propName, p.getProperty(propName));
+ }
+ }
+
+ public static void saveConfigIntoUDFProperties(Properties p, Configuration config, String propName) {
+ if(config.get(propName) != null){
+ p.setProperty(propName, config.get(propName));
+ }
+ }
+
}
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/rcfile/RCFileMapReduceOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/rcfile/RCFileMapReduceOutputFormat.java?rev=1149763&r1=1149762&r2=1149763&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/rcfile/RCFileMapReduceOutputFormat.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/rcfile/RCFileMapReduceOutputFormat.java Fri Jul 22 23:38:07 2011
@@ -19,6 +19,8 @@ package org.apache.hcatalog.rcfile;
import java.io.IOException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/rcfile/RCFileOutputDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/rcfile/RCFileOutputDriver.java?rev=1149763&r1=1149762&r2=1149763&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/rcfile/RCFileOutputDriver.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/rcfile/RCFileOutputDriver.java Fri Jul 22 23:38:07 2011
@@ -45,6 +45,7 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hcatalog.common.HCatUtil;
import org.apache.hcatalog.data.HCatRecord;
import org.apache.hcatalog.data.schema.HCatFieldSchema;
Modified: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java?rev=1149763&r1=1149762&r2=1149763&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java (original)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java Fri Jul 22 23:38:07 2011
@@ -243,7 +243,7 @@ public abstract class HCatMapReduceTest
void runMRCreate(Map<String, String> partitionValues,
List<HCatFieldSchema> partitionColumns, List<HCatRecord> records,
- int writeCount) throws Exception {
+ int writeCount, boolean assertWrite) throws Exception {
writeRecords = records;
MapCreate.writeCount = 0;
@@ -275,8 +275,11 @@ public abstract class HCatMapReduceTest
//new HCatOutputCommitter(null).setupJob(job);
job.waitForCompletion(true);
- new HCatOutputCommitter(null).cleanupJob(job);
- Assert.assertEquals(writeCount, MapCreate.writeCount);
+ new HCatOutputCommitter(job,null).cleanupJob(job);
+ if (assertWrite){
+ // we assert only if we expected to assert with this call.
+ Assert.assertEquals(writeCount, MapCreate.writeCount);
+ }
}
List<HCatRecord> runMRRead(int readCount) throws Exception {
Added: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java?rev=1149763&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java (added)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java Fri Jul 22 23:38:07 2011
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hcatalog.common.ErrorType;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchemaUtils;
+
+public class TestHCatDynamicPartitioned extends HCatMapReduceTest {
+
+ private List<HCatRecord> writeRecords;
+ private List<HCatFieldSchema> dataColumns;
+
+ @Override
+ protected void initialize() throws Exception {
+
+ tableName = "testHCatDynamicPartitionedTable";
+ generateWriteRecords(20,5,0);
+ generateDataColumns();
+ }
+
+ private void generateDataColumns() throws HCatException {
+ dataColumns = new ArrayList<HCatFieldSchema>();
+ dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", Constants.INT_TYPE_NAME, "")));
+ dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", Constants.STRING_TYPE_NAME, "")));
+ dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("p1", Constants.STRING_TYPE_NAME, "")));
+ }
+
+ private void generateWriteRecords(int max, int mod,int offset) {
+ writeRecords = new ArrayList<HCatRecord>();
+
+ for(int i = 0;i < max;i++) {
+ List<Object> objList = new ArrayList<Object>();
+
+ objList.add(i);
+ objList.add("strvalue" + i);
+ objList.add(String.valueOf((i % mod)+offset));
+ writeRecords.add(new DefaultHCatRecord(objList));
+ }
+ }
+
+ @Override
+ protected List<FieldSchema> getPartitionKeys() {
+ List<FieldSchema> fields = new ArrayList<FieldSchema>();
+ fields.add(new FieldSchema("p1", Constants.STRING_TYPE_NAME, ""));
+ return fields;
+ }
+
+ @Override
+ protected List<FieldSchema> getTableColumns() {
+ List<FieldSchema> fields = new ArrayList<FieldSchema>();
+ fields.add(new FieldSchema("c1", Constants.INT_TYPE_NAME, ""));
+ fields.add(new FieldSchema("c2", Constants.STRING_TYPE_NAME, ""));
+ return fields;
+ }
+
+
+ public void testHCatDynamicPartitionedTable() throws Exception {
+
+ generateWriteRecords(20,5,0);
+ runMRCreate(null, dataColumns, writeRecords, 20,true);
+
+ runMRRead(20);
+
+ //Read with partition filter
+ runMRRead(4, "p1 = \"0\"");
+ runMRRead(8, "p1 = \"1\" or p1 = \"3\"");
+ runMRRead(4, "p1 = \"4\"");
+
+ // read from hive to test
+
+ String query = "select * from " + tableName;
+ int retCode = driver.run(query).getResponseCode();
+
+ if( retCode != 0 ) {
+ throw new Exception("Error " + retCode + " running query " + query);
+ }
+
+ ArrayList<String> res = new ArrayList<String>();
+ driver.getResults(res);
+ assertEquals(20, res.size());
+
+
+ //Test for duplicate publish
+ IOException exc = null;
+ try {
+ generateWriteRecords(20,5,0);
+ runMRCreate(null, dataColumns, writeRecords, 20,false);
+ } catch(IOException e) {
+ exc = e;
+ }
+
+ assertTrue(exc != null);
+ assertTrue(exc instanceof HCatException);
+ assertEquals(ErrorType.ERROR_PUBLISHING_PARTITION, ((HCatException) exc).getErrorType());
+ }
+
+ public void testHCatDynamicPartitionMaxPartitions() throws Exception {
+ HiveConf hc = new HiveConf(this.getClass());
+
+ int maxParts = hiveConf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS);
+ System.out.println("Max partitions allowed = " + maxParts);
+
+ IOException exc = null;
+ try {
+ generateWriteRecords(maxParts+5,maxParts+2,10);
+ runMRCreate(null,dataColumns,writeRecords,maxParts+5,false);
+ } catch(IOException e) {
+ exc = e;
+ }
+
+ if (HCatConstants.HCAT_IS_DYNAMIC_MAX_PTN_CHECK_ENABLED){
+ assertTrue(exc != null);
+ assertTrue(exc instanceof HCatException);
+ assertEquals(ErrorType.ERROR_TOO_MANY_DYNAMIC_PTNS, ((HCatException) exc).getErrorType());
+ }else{
+ assertTrue(exc == null);
+ runMRRead(maxParts+5);
+ }
+ }
+}
Modified: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatEximInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatEximInputFormat.java?rev=1149763&r1=1149762&r2=1149763&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatEximInputFormat.java (original)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatEximInputFormat.java Fri Jul 22 23:38:07 2011
@@ -228,7 +228,7 @@ public class TestHCatEximInputFormat ext
schema);
job.waitForCompletion(true);
- HCatEximOutputCommitter committer = new HCatEximOutputCommitter(null);
+ HCatEximOutputCommitter committer = new HCatEximOutputCommitter(job,null);
committer.cleanupJob(job);
}
@@ -247,7 +247,7 @@ public class TestHCatEximInputFormat ext
schema);
job.waitForCompletion(true);
- HCatEximOutputCommitter committer = new HCatEximOutputCommitter(null);
+ HCatEximOutputCommitter committer = new HCatEximOutputCommitter(job,null);
committer.cleanupJob(job);
}
Modified: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatEximOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatEximOutputFormat.java?rev=1149763&r1=1149762&r2=1149763&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatEximOutputFormat.java (original)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatEximOutputFormat.java Fri Jul 22 23:38:07 2011
@@ -109,7 +109,7 @@ public class TestHCatEximOutputFormat ex
schema);
job.waitForCompletion(true);
- HCatEximOutputCommitter committer = new HCatEximOutputCommitter(null);
+ HCatEximOutputCommitter committer = new HCatEximOutputCommitter(job,null);
committer.cleanupJob(job);
Path metadataPath = new Path(outputLocation, "_metadata");
@@ -165,7 +165,7 @@ public class TestHCatEximOutputFormat ex
schema);
job.waitForCompletion(true);
- HCatEximOutputCommitter committer = new HCatEximOutputCommitter(null);
+ HCatEximOutputCommitter committer = new HCatEximOutputCommitter(job,null);
committer.cleanupJob(job);
Path metadataPath = new Path(outputLocation, "_metadata");
Map.Entry<Table, List<Partition>> rv = EximUtil.readMetaData(fs, metadataPath);
Modified: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java?rev=1149763&r1=1149762&r2=1149763&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java (original)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java Fri Jul 22 23:38:07 2011
@@ -78,12 +78,12 @@ public class TestHCatNonPartitioned exte
public void testHCatNonPartitionedTable() throws Exception {
Map<String, String> partitionMap = new HashMap<String, String>();
- runMRCreate(null, partitionColumns, writeRecords, 10);
+ runMRCreate(null, partitionColumns, writeRecords, 10,true);
//Test for duplicate publish
IOException exc = null;
try {
- runMRCreate(null, partitionColumns, writeRecords, 20);
+ runMRCreate(null, partitionColumns, writeRecords, 20,true);
} catch(IOException e) {
exc = e;
}
@@ -98,7 +98,7 @@ public class TestHCatNonPartitioned exte
partitionMap.put("px", "p1value2");
try {
- runMRCreate(partitionMap, partitionColumns, writeRecords, 20);
+ runMRCreate(partitionMap, partitionColumns, writeRecords, 20,true);
} catch(IOException e) {
exc = e;
}
Modified: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java?rev=1149763&r1=1149762&r2=1149763&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java (original)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java Fri Jul 22 23:38:07 2011
@@ -160,7 +160,7 @@ public class TestHCatOutputFormat extend
}
public void publishTest(Job job) throws Exception {
- OutputCommitter committer = new HCatOutputCommitter(null);
+ OutputCommitter committer = new HCatOutputCommitter(job,null);
committer.cleanupJob(job);
Partition part = client.getPartition(dbName, tblName, Arrays.asList("p1"));
Modified: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java?rev=1149763&r1=1149762&r2=1149763&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java (original)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java Fri Jul 22 23:38:07 2011
@@ -80,17 +80,17 @@ public class TestHCatPartitioned extends
Map<String, String> partitionMap = new HashMap<String, String>();
partitionMap.put("part1", "p1value1");
- runMRCreate(partitionMap, partitionColumns, writeRecords, 10);
+ runMRCreate(partitionMap, partitionColumns, writeRecords, 10,true);
partitionMap.clear();
partitionMap.put("PART1", "p1value2");
- runMRCreate(partitionMap, partitionColumns, writeRecords, 20);
+ runMRCreate(partitionMap, partitionColumns, writeRecords, 20,true);
//Test for duplicate publish
IOException exc = null;
try {
- runMRCreate(partitionMap, partitionColumns, writeRecords, 20);
+ runMRCreate(partitionMap, partitionColumns, writeRecords, 20,true);
} catch(IOException e) {
exc = e;
}
@@ -105,7 +105,7 @@ public class TestHCatPartitioned extends
partitionMap.put("px", "p1value2");
try {
- runMRCreate(partitionMap, partitionColumns, writeRecords, 20);
+ runMRCreate(partitionMap, partitionColumns, writeRecords, 20,true);
} catch(IOException e) {
exc = e;
}
@@ -118,14 +118,15 @@ public class TestHCatPartitioned extends
//Test for null partition value map
exc = null;
try {
- runMRCreate(null, partitionColumns, writeRecords, 20);
+ runMRCreate(null, partitionColumns, writeRecords, 20,false);
} catch(IOException e) {
exc = e;
}
- assertTrue(exc != null);
- assertTrue(exc instanceof HCatException);
- assertEquals(ErrorType.ERROR_INVALID_PARTITION_VALUES, ((HCatException) exc).getErrorType());
+ assertTrue(exc == null);
+// assertTrue(exc instanceof HCatException);
+// assertEquals(ErrorType.ERROR_PUBLISHING_PARTITION, ((HCatException) exc).getErrorType());
+ // With Dynamic partitioning, this isn't an error that the keyValues specified didn't values
//Read should get 10 + 20 rows
runMRRead(30);
@@ -166,7 +167,7 @@ public class TestHCatPartitioned extends
Map<String, String> partitionMap = new HashMap<String, String>();
partitionMap.put("part1", "p1value5");
- runMRCreate(partitionMap, partitionColumns, writeRecords, 10);
+ runMRCreate(partitionMap, partitionColumns, writeRecords, 10,true);
tableSchema = getTableSchema();
@@ -187,7 +188,7 @@ public class TestHCatPartitioned extends
IOException exc = null;
try {
- runMRCreate(partitionMap, partitionColumns, writeRecords, 20);
+ runMRCreate(partitionMap, partitionColumns, writeRecords, 20,true);
} catch(IOException e) {
exc = e;
}
@@ -217,7 +218,7 @@ public class TestHCatPartitioned extends
exc = null;
try {
- runMRCreate(partitionMap, partitionColumns, recordsContainingPartitionCols, 20);
+ runMRCreate(partitionMap, partitionColumns, recordsContainingPartitionCols, 20,true);
} catch(IOException e) {
exc = e;
}
@@ -266,7 +267,7 @@ public class TestHCatPartitioned extends
Exception exc = null;
try {
- runMRCreate(partitionMap, partitionColumns, writeRecords, 10);
+ runMRCreate(partitionMap, partitionColumns, writeRecords, 10,true);
} catch(IOException e) {
exc = e;
}
@@ -291,7 +292,7 @@ public class TestHCatPartitioned extends
writeRecords.add(new DefaultHCatRecord(objList));
}
- runMRCreate(partitionMap, partitionColumns, writeRecords, 10);
+ runMRCreate(partitionMap, partitionColumns, writeRecords, 10,true);
//Read should get 10 + 20 + 10 + 10 + 20 rows
runMRRead(70);
Modified: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatStorer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatStorer.java?rev=1149763&r1=1149762&r2=1149763&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatStorer.java (original)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatStorer.java Fri Jul 22 23:38:07 2011
@@ -585,4 +585,122 @@ public class TestHCatStorer extends Test
assertFalse(itr.hasNext());
}
+
+
+ public void testDynamicPartitioningMultiPartColsInDataPartialSpec() throws IOException, CommandNeedRetryException{
+
+ driver.run("drop table if exists employee");
+ String createTable = "CREATE TABLE employee (emp_id INT, emp_name STRING, emp_start_date STRING , emp_gender STRING ) " +
+ " PARTITIONED BY (emp_country STRING , emp_state STRING ) STORED AS RCFILE " +
+ "tblproperties('"+HCatConstants.HCAT_ISD_CLASS+"'='"+RCFileInputDriver.class.getName()+"'," +
+ "'"+HCatConstants.HCAT_OSD_CLASS+"'='"+RCFileOutputDriver.class.getName()+"') ";
+
+ int retCode = driver.run(createTable).getResponseCode();
+ if(retCode != 0) {
+ throw new IOException("Failed to create table.");
+ }
+
+ MiniCluster.deleteFile(cluster, fullFileName);
+ String[] inputData = {"111237\tKrishna\t01/01/1990\tM\tIN\tTN",
+ "111238\tKalpana\t01/01/2000\tF\tIN\tKA",
+ "111239\tSatya\t01/01/2001\tM\tIN\tKL",
+ "111240\tKavya\t01/01/2002\tF\tIN\tAP"};
+
+ MiniCluster.createInputFile(cluster, fullFileName, inputData);
+ PigServer pig = new PigServer(ExecType.LOCAL, props);
+ UDFContext.getUDFContext().setClientSystemProps();
+ pig.setBatchOn();
+ pig.registerQuery("A = LOAD '"+fullFileName+"' USING PigStorage() AS (emp_id:int,emp_name:chararray,emp_start_date:chararray," +
+ "emp_gender:chararray,emp_country:chararray,emp_state:chararray);");
+ pig.registerQuery("IN = FILTER A BY emp_country == 'IN';");
+ pig.registerQuery("STORE IN INTO 'employee' USING "+HCatStorer.class.getName()+"('emp_country=IN');");
+ pig.executeBatch();
+ driver.run("select * from employee");
+ ArrayList<String> results = new ArrayList<String>();
+ driver.getResults(results);
+ assertEquals(4, results.size());
+ Collections.sort(results);
+ assertEquals(inputData[0], results.get(0));
+ assertEquals(inputData[1], results.get(1));
+ assertEquals(inputData[2], results.get(2));
+ assertEquals(inputData[3], results.get(3));
+ MiniCluster.deleteFile(cluster, fullFileName);
+ driver.run("drop table employee");
+ }
+
+ public void testDynamicPartitioningMultiPartColsInDataNoSpec() throws IOException, CommandNeedRetryException{
+
+ driver.run("drop table if exists employee");
+ String createTable = "CREATE TABLE employee (emp_id INT, emp_name STRING, emp_start_date STRING , emp_gender STRING ) " +
+ " PARTITIONED BY (emp_country STRING , emp_state STRING ) STORED AS RCFILE " +
+ "tblproperties('"+HCatConstants.HCAT_ISD_CLASS+"'='"+RCFileInputDriver.class.getName()+"'," +
+ "'"+HCatConstants.HCAT_OSD_CLASS+"'='"+RCFileOutputDriver.class.getName()+"') ";
+
+ int retCode = driver.run(createTable).getResponseCode();
+ if(retCode != 0) {
+ throw new IOException("Failed to create table.");
+ }
+
+ MiniCluster.deleteFile(cluster, fullFileName);
+ String[] inputData = {"111237\tKrishna\t01/01/1990\tM\tIN\tTN",
+ "111238\tKalpana\t01/01/2000\tF\tIN\tKA",
+ "111239\tSatya\t01/01/2001\tM\tIN\tKL",
+ "111240\tKavya\t01/01/2002\tF\tIN\tAP"};
+
+ MiniCluster.createInputFile(cluster, fullFileName, inputData);
+ PigServer pig = new PigServer(ExecType.LOCAL, props);
+ UDFContext.getUDFContext().setClientSystemProps();
+ pig.setBatchOn();
+ pig.registerQuery("A = LOAD '"+fullFileName+"' USING PigStorage() AS (emp_id:int,emp_name:chararray,emp_start_date:chararray," +
+ "emp_gender:chararray,emp_country:chararray,emp_state:chararray);");
+ pig.registerQuery("IN = FILTER A BY emp_country == 'IN';");
+ pig.registerQuery("STORE IN INTO 'employee' USING "+HCatStorer.class.getName()+"();");
+ pig.executeBatch();
+ driver.run("select * from employee");
+ ArrayList<String> results = new ArrayList<String>();
+ driver.getResults(results);
+ assertEquals(4, results.size());
+ Collections.sort(results);
+ assertEquals(inputData[0], results.get(0));
+ assertEquals(inputData[1], results.get(1));
+ assertEquals(inputData[2], results.get(2));
+ assertEquals(inputData[3], results.get(3));
+ MiniCluster.deleteFile(cluster, fullFileName);
+ driver.run("drop table employee");
+ }
+
+ public void testDynamicPartitioningMultiPartColsNoDataInDataNoSpec() throws IOException, CommandNeedRetryException{
+
+ driver.run("drop table if exists employee");
+ String createTable = "CREATE TABLE employee (emp_id INT, emp_name STRING, emp_start_date STRING , emp_gender STRING ) " +
+ " PARTITIONED BY (emp_country STRING , emp_state STRING ) STORED AS RCFILE " +
+ "tblproperties('"+HCatConstants.HCAT_ISD_CLASS+"'='"+RCFileInputDriver.class.getName()+"'," +
+ "'"+HCatConstants.HCAT_OSD_CLASS+"'='"+RCFileOutputDriver.class.getName()+"') ";
+
+ int retCode = driver.run(createTable).getResponseCode();
+ if(retCode != 0) {
+ throw new IOException("Failed to create table.");
+ }
+
+ MiniCluster.deleteFile(cluster, fullFileName);
+ String[] inputData = {};
+
+ MiniCluster.createInputFile(cluster, fullFileName, inputData);
+ PigServer pig = new PigServer(ExecType.LOCAL, props);
+ UDFContext.getUDFContext().setClientSystemProps();
+ pig.setBatchOn();
+ pig.registerQuery("A = LOAD '"+fullFileName+"' USING PigStorage() AS (emp_id:int,emp_name:chararray,emp_start_date:chararray," +
+ "emp_gender:chararray,emp_country:chararray,emp_state:chararray);");
+ pig.registerQuery("IN = FILTER A BY emp_country == 'IN';");
+ pig.registerQuery("STORE IN INTO 'employee' USING "+HCatStorer.class.getName()+"();");
+ pig.executeBatch();
+ driver.run("select * from employee");
+ ArrayList<String> results = new ArrayList<String>();
+ driver.getResults(results);
+ assertEquals(0, results.size());
+ MiniCluster.deleteFile(cluster, fullFileName);
+ driver.run("drop table employee");
+ }
+
+
}