You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by kh...@apache.org on 2011/07/22 23:38:14 UTC

svn commit: r1149763 [2/2] - in /incubator/hcatalog/trunk: ./ src/java/org/apache/hcatalog/common/ src/java/org/apache/hcatalog/har/ src/java/org/apache/hcatalog/mapreduce/ src/java/org/apache/hcatalog/pig/ src/java/org/apache/hcatalog/rcfile/ src/test...

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatTableInfo.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatTableInfo.java?rev=1149763&r1=1149762&r2=1149763&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatTableInfo.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatTableInfo.java Fri Jul 22 23:38:07 2011
@@ -19,6 +19,7 @@
 package org.apache.hcatalog.mapreduce;
 
 import java.io.Serializable;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
@@ -66,6 +67,10 @@ public class HCatTableInfo implements Se
   /** The partition values to publish to, if used for output*/
   private Map<String, String> partitionValues;
 
+  /** List of keys for which values were not specified at write setup time, to be infered at write time */
+  private List<String> dynamicPartitioningKeys;
+  
+
   /**
    * Initializes a new HCatTableInfo instance to be used with {@link HCatInputFormat}
    * for reading data from a table.
@@ -229,6 +234,27 @@ public class HCatTableInfo implements Se
     return serverKerberosPrincipal;
   }
 
+  /**
+   * Returns whether or not Dynamic Partitioning is used
+   * @return whether or not dynamic partitioning is currently enabled and used
+   */
+  public boolean isDynamicPartitioningUsed() {
+    return !((dynamicPartitioningKeys == null) || (dynamicPartitioningKeys.isEmpty()));
+  }
+
+  /**
+   * Sets the list of dynamic partitioning keys used for outputting without specifying all the keys
+   * @param dynamicPartitioningKeys
+   */
+  public void setDynamicPartitioningKeys(List<String> dynamicPartitioningKeys) {
+    this.dynamicPartitioningKeys = dynamicPartitioningKeys;
+  }
+  
+  public List<String> getDynamicPartitioningKeys(){
+    return this.dynamicPartitioningKeys;
+  }
+
+
   @Override
   public int hashCode() {
     int result = 17;
@@ -240,8 +266,9 @@ public class HCatTableInfo implements Se
     result = 31*result + (partitionPredicates == null ? 0 : partitionPredicates.hashCode());
     result = 31*result + tableInfoType.ordinal();
     result = 31*result + (partitionValues == null ? 0 : partitionValues.hashCode());
+    result = 31*result + (dynamicPartitioningKeys == null ? 0 : dynamicPartitioningKeys.hashCode());
     return result;
-
   }
+
 }
 

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/OutputJobInfo.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/OutputJobInfo.java?rev=1149763&r1=1149762&r2=1149763&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/OutputJobInfo.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/OutputJobInfo.java Fri Jul 22 23:38:07 2011
@@ -56,6 +56,11 @@ class OutputJobInfo implements Serializa
      * data contains partition columns.*/
 
     private List<Integer> posOfPartCols;
+    private List<Integer> posOfDynPartCols;
+
+    private int maxDynamicPartitions;
+
+    private boolean harRequested;
 
     /**
      * @return the posOfPartCols
@@ -65,6 +70,13 @@ class OutputJobInfo implements Serializa
     }
 
     /**
+     * @return the posOfDynPartCols
+     */
+    protected List<Integer> getPosOfDynPartCols() {
+      return posOfDynPartCols;
+    }
+
+    /**
      * @param posOfPartCols the posOfPartCols to set
      */
     protected void setPosOfPartCols(List<Integer> posOfPartCols) {
@@ -78,6 +90,14 @@ class OutputJobInfo implements Serializa
       this.posOfPartCols = posOfPartCols;
     }
 
+    /**
+     * @param posOfDynPartCols the posOfDynPartCols to set
+     */
+    protected void setPosOfDynPartCols(List<Integer> posOfDynPartCols) {
+      // Important - no sorting here! We retain order, it's used to match with values at runtime
+      this.posOfDynPartCols = posOfDynPartCols;
+    }
+
     public OutputJobInfo(HCatTableInfo tableInfo, HCatSchema outputSchema, HCatSchema tableSchema,
         StorerInfo storerInfo, String location, Table table) {
       super();
@@ -139,4 +159,36 @@ class OutputJobInfo implements Serializa
       return table;
     }
 
+    /**
+     * Set maximum number of allowable dynamic partitions
+     * @param maxDynamicPartitions
+     */
+    public void setMaximumDynamicPartitions(int maxDynamicPartitions){
+      this.maxDynamicPartitions = maxDynamicPartitions;
+    }
+    
+    /**
+     * Returns maximum number of allowable dynamic partitions
+     * @return maximum number of allowable dynamic partitions
+     */
+    public int getMaxDynamicPartitions() {
+      return this.maxDynamicPartitions;
+    }
+
+    /**
+     * Sets whether or not hadoop archiving has been requested for this job
+     * @param harRequested
+     */
+    public void setHarRequested(boolean harRequested){
+      this.harRequested = harRequested;
+    }
+    
+    /**
+     * Returns whether or not hadoop archiving has been requested for this job
+     * @return whether or not hadoop archiving has been requested for this job
+     */
+    public boolean getHarRequested() {
+      return this.harRequested;
+    }
+
 }

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatEximStorer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatEximStorer.java?rev=1149763&r1=1149762&r2=1149763&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatEximStorer.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatEximStorer.java Fri Jul 22 23:38:07 2011
@@ -145,7 +145,7 @@ public class HCatEximStorer extends HCat
       //In local mode, mapreduce will not call HCatOutputCommitter.cleanupJob.
       //Calling it from here so that the partition publish happens.
       //This call needs to be removed after MAPREDUCE-1447 is fixed.
-      new HCatEximOutputCommitter(null).cleanupJob(job);
+      new HCatEximOutputCommitter(job,null).cleanupJob(job);
     }
   }
 }

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java?rev=1149763&r1=1149762&r2=1149763&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java Fri Jul 22 23:38:07 2011
@@ -109,34 +109,35 @@ public class HCatStorer extends HCatBase
       computedSchema = convertPigSchemaToHCatSchema(pigSchema,hcatTblSchema);
       HCatOutputFormat.setSchema(job, computedSchema);
       p.setProperty(HCatConstants.HCAT_KEY_OUTPUT_INFO, config.get(HCatConstants.HCAT_KEY_OUTPUT_INFO));
-      if(config.get(HCatConstants.HCAT_KEY_HIVE_CONF) != null){
-        p.setProperty(HCatConstants.HCAT_KEY_HIVE_CONF, config.get(HCatConstants.HCAT_KEY_HIVE_CONF));
-      }
-      if(config.get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null){
-        p.setProperty(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE,
-            config.get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE));
-      }
+      
+      PigHCatUtil.saveConfigIntoUDFProperties(p, config,HCatConstants.HCAT_KEY_HIVE_CONF);
+      PigHCatUtil.saveConfigIntoUDFProperties(p, config,HCatConstants.HCAT_DYNAMIC_PTN_JOBID);
+      PigHCatUtil.saveConfigIntoUDFProperties(p, config,HCatConstants.HCAT_KEY_TOKEN_SIGNATURE);
+      PigHCatUtil.saveConfigIntoUDFProperties(p, config,HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_SIGNATURE);
+      PigHCatUtil.saveConfigIntoUDFProperties(p, config,HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_STRFORM);
+      
       p.setProperty(COMPUTED_OUTPUT_SCHEMA,ObjectSerializer.serialize(computedSchema));
 
     }else{
       config.set(HCatConstants.HCAT_KEY_OUTPUT_INFO, p.getProperty(HCatConstants.HCAT_KEY_OUTPUT_INFO));
-      if(p.getProperty(HCatConstants.HCAT_KEY_HIVE_CONF) != null){
-        config.set(HCatConstants.HCAT_KEY_HIVE_CONF, p.getProperty(HCatConstants.HCAT_KEY_HIVE_CONF));
-      }
-      if(p.getProperty(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null){
-        config.set(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE,
-            p.getProperty(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE));
-      }
+      
+      PigHCatUtil.getConfigFromUDFProperties(p, config, HCatConstants.HCAT_KEY_HIVE_CONF);
+      PigHCatUtil.getConfigFromUDFProperties(p, config, HCatConstants.HCAT_DYNAMIC_PTN_JOBID);
+      PigHCatUtil.getConfigFromUDFProperties(p, config, HCatConstants.HCAT_KEY_TOKEN_SIGNATURE);
+      PigHCatUtil.getConfigFromUDFProperties(p, config, HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_SIGNATURE);
+      PigHCatUtil.getConfigFromUDFProperties(p, config, HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_STRFORM);
+      
     }
   }
 
+
   @Override
   public void storeSchema(ResourceSchema schema, String arg1, Job job) throws IOException {
     if( job.getConfiguration().get("mapred.job.tracker", "").equalsIgnoreCase("local") ) {
       //In local mode, mapreduce will not call HCatOutputCommitter.cleanupJob.
       //Calling it from here so that the partition publish happens.
       //This call needs to be removed after MAPREDUCE-1447 is fixed.
-      new HCatOutputCommitter(null).cleanupJob(job);
+      new HCatOutputCommitter(job,null).cleanupJob(job);
     }
   }
 }

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/PigHCatUtil.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/PigHCatUtil.java?rev=1149763&r1=1149762&r2=1149763&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/PigHCatUtil.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/PigHCatUtil.java Fri Jul 22 23:38:07 2011
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
@@ -405,4 +406,16 @@ public static Object extractPigObject(Ob
 
   }
 
+  public static void getConfigFromUDFProperties(Properties p, Configuration config, String propName) {
+    if(p.getProperty(propName) != null){
+      config.set(propName, p.getProperty(propName));
+    }
+  }
+
+  public static void saveConfigIntoUDFProperties(Properties p, Configuration config, String propName) {
+    if(config.get(propName) != null){
+      p.setProperty(propName, config.get(propName));
+    }
+  }
+
 }

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/rcfile/RCFileMapReduceOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/rcfile/RCFileMapReduceOutputFormat.java?rev=1149763&r1=1149762&r2=1149763&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/rcfile/RCFileMapReduceOutputFormat.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/rcfile/RCFileMapReduceOutputFormat.java Fri Jul 22 23:38:07 2011
@@ -19,6 +19,8 @@ package org.apache.hcatalog.rcfile;
 
 import java.io.IOException;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/rcfile/RCFileOutputDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/rcfile/RCFileOutputDriver.java?rev=1149763&r1=1149762&r2=1149763&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/rcfile/RCFileOutputDriver.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/rcfile/RCFileOutputDriver.java Fri Jul 22 23:38:07 2011
@@ -45,6 +45,7 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hcatalog.common.HCatUtil;
 import org.apache.hcatalog.data.HCatRecord;
 import org.apache.hcatalog.data.schema.HCatFieldSchema;

Modified: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java?rev=1149763&r1=1149762&r2=1149763&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java (original)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java Fri Jul 22 23:38:07 2011
@@ -243,7 +243,7 @@ public abstract class HCatMapReduceTest 
 
   void runMRCreate(Map<String, String> partitionValues,
         List<HCatFieldSchema> partitionColumns, List<HCatRecord> records,
-        int writeCount) throws Exception {
+        int writeCount, boolean assertWrite) throws Exception {
 
     writeRecords = records;
     MapCreate.writeCount = 0;
@@ -275,8 +275,11 @@ public abstract class HCatMapReduceTest 
 
     //new HCatOutputCommitter(null).setupJob(job);
     job.waitForCompletion(true);
-    new HCatOutputCommitter(null).cleanupJob(job);
-    Assert.assertEquals(writeCount, MapCreate.writeCount);
+    new HCatOutputCommitter(job,null).cleanupJob(job);
+    if (assertWrite){
+      // we assert only if we expected to assert with this call.
+      Assert.assertEquals(writeCount, MapCreate.writeCount);
+    }
   }
 
   List<HCatRecord> runMRRead(int readCount) throws Exception {

Added: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java?rev=1149763&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java (added)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java Fri Jul 22 23:38:07 2011
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hcatalog.common.ErrorType;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchemaUtils;
+
+public class TestHCatDynamicPartitioned extends HCatMapReduceTest {
+
+  private List<HCatRecord> writeRecords;
+  private List<HCatFieldSchema> dataColumns;
+
+  @Override
+  protected void initialize() throws Exception {
+
+    tableName = "testHCatDynamicPartitionedTable";
+    generateWriteRecords(20,5,0);
+    generateDataColumns();
+  }
+
+  private void generateDataColumns() throws HCatException {
+    dataColumns = new ArrayList<HCatFieldSchema>();
+    dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", Constants.INT_TYPE_NAME, "")));
+    dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", Constants.STRING_TYPE_NAME, "")));
+    dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("p1", Constants.STRING_TYPE_NAME, "")));
+  }
+
+  private void generateWriteRecords(int max, int mod,int offset) {
+    writeRecords = new ArrayList<HCatRecord>();
+
+    for(int i = 0;i < max;i++) {
+      List<Object> objList = new ArrayList<Object>();
+
+      objList.add(i);
+      objList.add("strvalue" + i);
+      objList.add(String.valueOf((i % mod)+offset));
+      writeRecords.add(new DefaultHCatRecord(objList));
+    }
+  }
+
+  @Override
+  protected List<FieldSchema> getPartitionKeys() {
+    List<FieldSchema> fields = new ArrayList<FieldSchema>();
+    fields.add(new FieldSchema("p1", Constants.STRING_TYPE_NAME, ""));
+    return fields;
+  }
+
+  @Override
+  protected List<FieldSchema> getTableColumns() {
+    List<FieldSchema> fields = new ArrayList<FieldSchema>();
+    fields.add(new FieldSchema("c1", Constants.INT_TYPE_NAME, ""));
+    fields.add(new FieldSchema("c2", Constants.STRING_TYPE_NAME, ""));
+    return fields;
+  }
+
+
+  public void testHCatDynamicPartitionedTable() throws Exception {
+
+    generateWriteRecords(20,5,0);
+    runMRCreate(null, dataColumns, writeRecords, 20,true);
+
+    runMRRead(20);
+
+    //Read with partition filter
+    runMRRead(4, "p1 = \"0\"");
+    runMRRead(8, "p1 = \"1\" or p1 = \"3\"");
+    runMRRead(4, "p1 = \"4\"");
+
+    // read from hive to test
+    
+    String query = "select * from " + tableName;
+    int retCode = driver.run(query).getResponseCode();
+
+    if( retCode != 0 ) {
+      throw new Exception("Error " + retCode + " running query " + query);
+    }
+
+    ArrayList<String> res = new ArrayList<String>();
+    driver.getResults(res);
+    assertEquals(20, res.size());
+
+    
+    //Test for duplicate publish
+    IOException exc = null;
+    try {
+      generateWriteRecords(20,5,0);
+      runMRCreate(null, dataColumns, writeRecords, 20,false);
+    } catch(IOException e) {
+      exc = e;
+    }
+
+    assertTrue(exc != null);
+    assertTrue(exc instanceof HCatException);
+    assertEquals(ErrorType.ERROR_PUBLISHING_PARTITION, ((HCatException) exc).getErrorType());
+  }
+
+  public void testHCatDynamicPartitionMaxPartitions() throws Exception {
+    HiveConf hc = new HiveConf(this.getClass());
+    
+    int maxParts = hiveConf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS);
+    System.out.println("Max partitions allowed = " + maxParts);
+
+    IOException exc = null;
+    try {
+      generateWriteRecords(maxParts+5,maxParts+2,10);
+      runMRCreate(null,dataColumns,writeRecords,maxParts+5,false);
+    } catch(IOException e) {
+      exc = e;
+    }
+
+    if (HCatConstants.HCAT_IS_DYNAMIC_MAX_PTN_CHECK_ENABLED){
+      assertTrue(exc != null);
+      assertTrue(exc instanceof HCatException);
+      assertEquals(ErrorType.ERROR_TOO_MANY_DYNAMIC_PTNS, ((HCatException) exc).getErrorType());
+    }else{
+      assertTrue(exc == null);
+      runMRRead(maxParts+5);
+    }
+  }
+}

Modified: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatEximInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatEximInputFormat.java?rev=1149763&r1=1149762&r2=1149763&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatEximInputFormat.java (original)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatEximInputFormat.java Fri Jul 22 23:38:07 2011
@@ -228,7 +228,7 @@ public class TestHCatEximInputFormat ext
         schema);
 
     job.waitForCompletion(true);
-    HCatEximOutputCommitter committer = new HCatEximOutputCommitter(null);
+    HCatEximOutputCommitter committer = new HCatEximOutputCommitter(job,null);
     committer.cleanupJob(job);
   }
 
@@ -247,7 +247,7 @@ public class TestHCatEximInputFormat ext
         schema);
 
     job.waitForCompletion(true);
-    HCatEximOutputCommitter committer = new HCatEximOutputCommitter(null);
+    HCatEximOutputCommitter committer = new HCatEximOutputCommitter(job,null);
     committer.cleanupJob(job);
   }
 

Modified: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatEximOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatEximOutputFormat.java?rev=1149763&r1=1149762&r2=1149763&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatEximOutputFormat.java (original)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatEximOutputFormat.java Fri Jul 22 23:38:07 2011
@@ -109,7 +109,7 @@ public class TestHCatEximOutputFormat ex
           schema);
 
       job.waitForCompletion(true);
-      HCatEximOutputCommitter committer = new HCatEximOutputCommitter(null);
+      HCatEximOutputCommitter committer = new HCatEximOutputCommitter(job,null);
       committer.cleanupJob(job);
 
       Path metadataPath = new Path(outputLocation, "_metadata");
@@ -165,7 +165,7 @@ public class TestHCatEximOutputFormat ex
           schema);
 
       job.waitForCompletion(true);
-      HCatEximOutputCommitter committer = new HCatEximOutputCommitter(null);
+      HCatEximOutputCommitter committer = new HCatEximOutputCommitter(job,null);
       committer.cleanupJob(job);
       Path metadataPath = new Path(outputLocation, "_metadata");
       Map.Entry<Table, List<Partition>> rv = EximUtil.readMetaData(fs, metadataPath);

Modified: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java?rev=1149763&r1=1149762&r2=1149763&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java (original)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java Fri Jul 22 23:38:07 2011
@@ -78,12 +78,12 @@ public class TestHCatNonPartitioned exte
   public void testHCatNonPartitionedTable() throws Exception {
 
     Map<String, String> partitionMap = new HashMap<String, String>();
-    runMRCreate(null, partitionColumns, writeRecords, 10);
+    runMRCreate(null, partitionColumns, writeRecords, 10,true);
 
     //Test for duplicate publish
     IOException exc = null;
     try {
-      runMRCreate(null,  partitionColumns, writeRecords, 20);
+      runMRCreate(null,  partitionColumns, writeRecords, 20,true);
     } catch(IOException e) {
       exc = e;
     }
@@ -98,7 +98,7 @@ public class TestHCatNonPartitioned exte
     partitionMap.put("px", "p1value2");
 
     try {
-      runMRCreate(partitionMap, partitionColumns, writeRecords, 20);
+      runMRCreate(partitionMap, partitionColumns, writeRecords, 20,true);
     } catch(IOException e) {
       exc = e;
     }

Modified: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java?rev=1149763&r1=1149762&r2=1149763&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java (original)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java Fri Jul 22 23:38:07 2011
@@ -160,7 +160,7 @@ public class TestHCatOutputFormat extend
   }
 
   public void publishTest(Job job) throws Exception {
-    OutputCommitter committer = new HCatOutputCommitter(null);
+    OutputCommitter committer = new HCatOutputCommitter(job,null);
     committer.cleanupJob(job);
 
     Partition part = client.getPartition(dbName, tblName, Arrays.asList("p1"));

Modified: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java?rev=1149763&r1=1149762&r2=1149763&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java (original)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java Fri Jul 22 23:38:07 2011
@@ -80,17 +80,17 @@ public class TestHCatPartitioned extends
     Map<String, String> partitionMap = new HashMap<String, String>();
     partitionMap.put("part1", "p1value1");
 
-    runMRCreate(partitionMap, partitionColumns, writeRecords, 10);
+    runMRCreate(partitionMap, partitionColumns, writeRecords, 10,true);
 
     partitionMap.clear();
     partitionMap.put("PART1", "p1value2");
 
-    runMRCreate(partitionMap, partitionColumns, writeRecords, 20);
+    runMRCreate(partitionMap, partitionColumns, writeRecords, 20,true);
 
     //Test for duplicate publish
     IOException exc = null;
     try {
-      runMRCreate(partitionMap, partitionColumns, writeRecords, 20);
+      runMRCreate(partitionMap, partitionColumns, writeRecords, 20,true);
     } catch(IOException e) {
       exc = e;
     }
@@ -105,7 +105,7 @@ public class TestHCatPartitioned extends
     partitionMap.put("px", "p1value2");
 
     try {
-      runMRCreate(partitionMap, partitionColumns, writeRecords, 20);
+      runMRCreate(partitionMap, partitionColumns, writeRecords, 20,true);
     } catch(IOException e) {
       exc = e;
     }
@@ -118,14 +118,15 @@ public class TestHCatPartitioned extends
     //Test for null partition value map
     exc = null;
     try {
-      runMRCreate(null, partitionColumns, writeRecords, 20);
+      runMRCreate(null, partitionColumns, writeRecords, 20,false);
     } catch(IOException e) {
       exc = e;
     }
 
-    assertTrue(exc != null);
-    assertTrue(exc instanceof HCatException);
-    assertEquals(ErrorType.ERROR_INVALID_PARTITION_VALUES, ((HCatException) exc).getErrorType());
+    assertTrue(exc == null);
+//    assertTrue(exc instanceof HCatException);
+//    assertEquals(ErrorType.ERROR_PUBLISHING_PARTITION, ((HCatException) exc).getErrorType());
+    // With Dynamic partitioning, this isn't an error that the keyValues specified didn't values
 
     //Read should get 10 + 20 rows
     runMRRead(30);
@@ -166,7 +167,7 @@ public class TestHCatPartitioned extends
     Map<String, String> partitionMap = new HashMap<String, String>();
     partitionMap.put("part1", "p1value5");
 
-    runMRCreate(partitionMap, partitionColumns, writeRecords, 10);
+    runMRCreate(partitionMap, partitionColumns, writeRecords, 10,true);
 
     tableSchema = getTableSchema();
 
@@ -187,7 +188,7 @@ public class TestHCatPartitioned extends
 
     IOException exc = null;
     try {
-      runMRCreate(partitionMap, partitionColumns, writeRecords, 20);
+      runMRCreate(partitionMap, partitionColumns, writeRecords, 20,true);
     } catch(IOException e) {
       exc = e;
     }
@@ -217,7 +218,7 @@ public class TestHCatPartitioned extends
 
     exc = null;
     try {
-      runMRCreate(partitionMap, partitionColumns, recordsContainingPartitionCols, 20);
+      runMRCreate(partitionMap, partitionColumns, recordsContainingPartitionCols, 20,true);
     } catch(IOException e) {
       exc = e;
     }
@@ -266,7 +267,7 @@ public class TestHCatPartitioned extends
 
     Exception exc = null;
     try {
-      runMRCreate(partitionMap, partitionColumns, writeRecords, 10);
+      runMRCreate(partitionMap, partitionColumns, writeRecords, 10,true);
     } catch(IOException e) {
       exc = e;
     }
@@ -291,7 +292,7 @@ public class TestHCatPartitioned extends
       writeRecords.add(new DefaultHCatRecord(objList));
     }
 
-    runMRCreate(partitionMap, partitionColumns, writeRecords, 10);
+    runMRCreate(partitionMap, partitionColumns, writeRecords, 10,true);
 
     //Read should get 10 + 20 + 10 + 10 + 20 rows
     runMRRead(70);

Modified: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatStorer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatStorer.java?rev=1149763&r1=1149762&r2=1149763&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatStorer.java (original)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatStorer.java Fri Jul 22 23:38:07 2011
@@ -585,4 +585,122 @@ public class TestHCatStorer extends Test
    assertFalse(itr.hasNext());
 
   }
+  
+
+  public void testDynamicPartitioningMultiPartColsInDataPartialSpec() throws IOException, CommandNeedRetryException{
+
+    driver.run("drop table if exists employee");
+    String createTable = "CREATE TABLE employee (emp_id INT, emp_name STRING, emp_start_date STRING , emp_gender STRING ) " +
+        " PARTITIONED BY (emp_country STRING , emp_state STRING ) STORED AS RCFILE " +
+        "tblproperties('"+HCatConstants.HCAT_ISD_CLASS+"'='"+RCFileInputDriver.class.getName()+"'," +
+        "'"+HCatConstants.HCAT_OSD_CLASS+"'='"+RCFileOutputDriver.class.getName()+"') ";
+
+    int retCode = driver.run(createTable).getResponseCode();
+    if(retCode != 0) {
+      throw new IOException("Failed to create table.");
+    }
+
+    MiniCluster.deleteFile(cluster, fullFileName);
+    String[] inputData = {"111237\tKrishna\t01/01/1990\tM\tIN\tTN",
+                          "111238\tKalpana\t01/01/2000\tF\tIN\tKA",
+                          "111239\tSatya\t01/01/2001\tM\tIN\tKL",
+                          "111240\tKavya\t01/01/2002\tF\tIN\tAP"};
+
+    MiniCluster.createInputFile(cluster, fullFileName, inputData);
+    PigServer pig = new PigServer(ExecType.LOCAL, props);
+    UDFContext.getUDFContext().setClientSystemProps();
+    pig.setBatchOn();
+    pig.registerQuery("A = LOAD '"+fullFileName+"' USING PigStorage() AS (emp_id:int,emp_name:chararray,emp_start_date:chararray," +
+        "emp_gender:chararray,emp_country:chararray,emp_state:chararray);");
+    pig.registerQuery("IN = FILTER A BY emp_country == 'IN';");
+    pig.registerQuery("STORE IN INTO 'employee' USING "+HCatStorer.class.getName()+"('emp_country=IN');");
+    pig.executeBatch();
+    driver.run("select * from employee");
+    ArrayList<String> results = new ArrayList<String>();
+    driver.getResults(results);
+    assertEquals(4, results.size());
+    Collections.sort(results);
+    assertEquals(inputData[0], results.get(0));
+    assertEquals(inputData[1], results.get(1));
+    assertEquals(inputData[2], results.get(2));
+    assertEquals(inputData[3], results.get(3));
+    MiniCluster.deleteFile(cluster, fullFileName);
+    driver.run("drop table employee");
+  }
+
+  public void testDynamicPartitioningMultiPartColsInDataNoSpec() throws IOException, CommandNeedRetryException{
+
+    driver.run("drop table if exists employee");
+    String createTable = "CREATE TABLE employee (emp_id INT, emp_name STRING, emp_start_date STRING , emp_gender STRING ) " +
+        " PARTITIONED BY (emp_country STRING , emp_state STRING ) STORED AS RCFILE " +
+        "tblproperties('"+HCatConstants.HCAT_ISD_CLASS+"'='"+RCFileInputDriver.class.getName()+"'," +
+        "'"+HCatConstants.HCAT_OSD_CLASS+"'='"+RCFileOutputDriver.class.getName()+"') ";
+
+    int retCode = driver.run(createTable).getResponseCode();
+    if(retCode != 0) {
+      throw new IOException("Failed to create table.");
+    }
+
+    MiniCluster.deleteFile(cluster, fullFileName);
+    String[] inputData = {"111237\tKrishna\t01/01/1990\tM\tIN\tTN",
+                          "111238\tKalpana\t01/01/2000\tF\tIN\tKA",
+                          "111239\tSatya\t01/01/2001\tM\tIN\tKL",
+                          "111240\tKavya\t01/01/2002\tF\tIN\tAP"};
+
+    MiniCluster.createInputFile(cluster, fullFileName, inputData);
+    PigServer pig = new PigServer(ExecType.LOCAL, props);
+    UDFContext.getUDFContext().setClientSystemProps();
+    pig.setBatchOn();
+    pig.registerQuery("A = LOAD '"+fullFileName+"' USING PigStorage() AS (emp_id:int,emp_name:chararray,emp_start_date:chararray," +
+        "emp_gender:chararray,emp_country:chararray,emp_state:chararray);");
+    pig.registerQuery("IN = FILTER A BY emp_country == 'IN';");
+    pig.registerQuery("STORE IN INTO 'employee' USING "+HCatStorer.class.getName()+"();");
+    pig.executeBatch();
+    driver.run("select * from employee");
+    ArrayList<String> results = new ArrayList<String>();
+    driver.getResults(results);
+    assertEquals(4, results.size());
+    Collections.sort(results);
+    assertEquals(inputData[0], results.get(0));
+    assertEquals(inputData[1], results.get(1));
+    assertEquals(inputData[2], results.get(2));
+    assertEquals(inputData[3], results.get(3));
+    MiniCluster.deleteFile(cluster, fullFileName);
+    driver.run("drop table employee");
+  }
+
+    public void testDynamicPartitioningMultiPartColsNoDataInDataNoSpec() throws IOException, CommandNeedRetryException{
+
+      driver.run("drop table if exists employee");
+      String createTable = "CREATE TABLE employee (emp_id INT, emp_name STRING, emp_start_date STRING , emp_gender STRING ) " +
+          " PARTITIONED BY (emp_country STRING , emp_state STRING ) STORED AS RCFILE " +
+          "tblproperties('"+HCatConstants.HCAT_ISD_CLASS+"'='"+RCFileInputDriver.class.getName()+"'," +
+          "'"+HCatConstants.HCAT_OSD_CLASS+"'='"+RCFileOutputDriver.class.getName()+"') ";
+
+      int retCode = driver.run(createTable).getResponseCode();
+      if(retCode != 0) {
+        throw new IOException("Failed to create table.");
+      }
+
+      MiniCluster.deleteFile(cluster, fullFileName);
+      String[] inputData = {};
+
+      MiniCluster.createInputFile(cluster, fullFileName, inputData);
+      PigServer pig = new PigServer(ExecType.LOCAL, props);
+      UDFContext.getUDFContext().setClientSystemProps();
+      pig.setBatchOn();
+      pig.registerQuery("A = LOAD '"+fullFileName+"' USING PigStorage() AS (emp_id:int,emp_name:chararray,emp_start_date:chararray," +
+          "emp_gender:chararray,emp_country:chararray,emp_state:chararray);");
+      pig.registerQuery("IN = FILTER A BY emp_country == 'IN';");
+      pig.registerQuery("STORE IN INTO 'employee' USING "+HCatStorer.class.getName()+"();");
+      pig.executeBatch();
+      driver.run("select * from employee");
+      ArrayList<String> results = new ArrayList<String>();
+      driver.getResults(results);
+      assertEquals(0, results.size());
+      MiniCluster.deleteFile(cluster, fullFileName);
+      driver.run("drop table employee");
+    }
+
+
 }