You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by to...@apache.org on 2012/09/21 18:37:05 UTC

svn commit: r1388607 - in /incubator/hcatalog/branches/branch-0.4: ./ src/java/org/apache/hcatalog/mapreduce/ src/test/org/apache/hcatalog/mapreduce/

Author: toffer
Date: Fri Sep 21 18:37:05 2012
New Revision: 1388607

URL: http://svn.apache.org/viewvc?rev=1388607&view=rev
Log:
merged from trunk: HCAT-490 HCatStorer() throws error when the same partition key is present in records in more than one tasks running as part of the same job (amalakar via toffer)

Modified:
    incubator/hcatalog/branches/branch-0.4/CHANGES.txt
    incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java
    incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java
    incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java
    incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java
    incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java

Modified: incubator/hcatalog/branches/branch-0.4/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/CHANGES.txt?rev=1388607&r1=1388606&r2=1388607&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/CHANGES.txt (original)
+++ incubator/hcatalog/branches/branch-0.4/CHANGES.txt Fri Sep 21 18:37:05 2012
@@ -63,6 +63,8 @@ Trunk (unreleased changes)
   OPTIMIZATIONS
 
   BUG FIXES
+  HCAT-490 HCatStorer() throws error when the same partition key is present in records in more than one tasks running as part of the same job (amalakar via toffer)
+
   HCAT-494 MultiOutputFormat in 0.23 fails to setAliasConf() correctly. (mithun via toffer)
 
   HCAT-507 e2e harness failing on 0.23 (toffer)

Modified: incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java?rev=1388607&r1=1388606&r2=1388607&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java (original)
+++ incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java Fri Sep 21 18:37:05 2012
@@ -82,17 +82,17 @@ class FileRecordWriterContainer extends 
      */
     public FileRecordWriterContainer(org.apache.hadoop.mapred.RecordWriter<? super WritableComparable<?>, ? super Writable> baseWriter,
                                      TaskAttemptContext context) throws IOException, InterruptedException {
-        super(context,baseWriter);
+        super(context, baseWriter);
         this.context = context;
         jobInfo = HCatOutputFormat.getJobInfo(context);
 
         storageHandler = HCatUtil.getStorageHandler(context.getConfiguration(), jobInfo.getTableInfo().getStorerInfo());
-        serDe = ReflectionUtils.newInstance(storageHandler.getSerDeClass(),context.getConfiguration());
+        serDe = ReflectionUtils.newInstance(storageHandler.getSerDeClass(), context.getConfiguration());
         objectInspector = InternalUtil.createStructObjectInspector(jobInfo.getOutputSchema());
         try {
             InternalUtil.initializeOutputSerDe(serDe, context.getConfiguration(), jobInfo);
         } catch (SerDeException e) {
-            throw new IOException("Failed to inialize SerDe",e);
+            throw new IOException("Failed to inialize SerDe", e);
         }
 
         // If partition columns occur in data, we want to remove them.
@@ -101,9 +101,9 @@ class FileRecordWriterContainer extends 
         dynamicPartCols = jobInfo.getPosOfDynPartCols();
         maxDynamicPartitions = jobInfo.getMaxDynamicPartitions();
 
-        if((partColsToDel == null) || (dynamicPartitioningUsed && (dynamicPartCols == null))){
+        if ((partColsToDel == null) || (dynamicPartitioningUsed && (dynamicPartCols == null))) {
             throw new HCatException("It seems that setSchema() is not called on " +
-                    "HCatOutputFormat. Please make sure that method is called.");
+                "HCatOutputFormat. Please make sure that method is called.");
         }
 
 
@@ -114,10 +114,9 @@ class FileRecordWriterContainer extends 
             this.dynamicContexts = null;
             this.dynamicObjectInspectors = null;
             this.dynamicOutputJobInfo = null;
-        }
-        else {
-            this.baseDynamicSerDe = new HashMap<String,SerDe>();
-            this.baseDynamicWriters = new HashMap<String,org.apache.hadoop.mapred.RecordWriter<? super WritableComparable<?>, ? super Writable>>();
+        } else {
+            this.baseDynamicSerDe = new HashMap<String, SerDe>();
+            this.baseDynamicWriters = new HashMap<String, org.apache.hadoop.mapred.RecordWriter<? super WritableComparable<?>, ? super Writable>>();
             this.baseDynamicCommitters = new HashMap<String, org.apache.hadoop.mapred.OutputCommitter>();
             this.dynamicContexts = new HashMap<String, org.apache.hadoop.mapred.TaskAttemptContext>();
             this.dynamicObjectInspectors = new HashMap<String, ObjectInspector>();
@@ -134,17 +133,17 @@ class FileRecordWriterContainer extends 
 
     @Override
     public void close(TaskAttemptContext context) throws IOException,
-            InterruptedException {
+        InterruptedException {
         Reporter reporter = InternalUtil.createReporter(context);
-        if (dynamicPartitioningUsed){
-            for (org.apache.hadoop.mapred.RecordWriter<? super WritableComparable<?>, ? super Writable> bwriter : baseDynamicWriters.values()){
+        if (dynamicPartitioningUsed) {
+            for (org.apache.hadoop.mapred.RecordWriter<? super WritableComparable<?>, ? super Writable> bwriter : baseDynamicWriters.values()) {
                 //We are in RecordWriter.close() make sense that the context would be TaskInputOutput
                 bwriter.close(reporter);
             }
-            for(Map.Entry<String,org.apache.hadoop.mapred.OutputCommitter>entry : baseDynamicCommitters.entrySet()) {
+            for (Map.Entry<String, org.apache.hadoop.mapred.OutputCommitter> entry : baseDynamicCommitters.entrySet()) {
                 org.apache.hadoop.mapred.TaskAttemptContext currContext = dynamicContexts.get(entry.getKey());
                 OutputCommitter baseOutputCommitter = entry.getValue();
-                if (baseOutputCommitter.needsTaskCommit(currContext)){
+                if (baseOutputCommitter.needsTaskCommit(currContext)) {
                     baseOutputCommitter.commitTask(currContext);
                 }
                 org.apache.hadoop.mapred.JobContext currJobContext = HCatMapRedUtil.createJobContext(currContext);
@@ -157,92 +156,96 @@ class FileRecordWriterContainer extends 
 
     @Override
     public void write(WritableComparable<?> key, HCatRecord value) throws IOException,
-            InterruptedException {
+        InterruptedException {
         org.apache.hadoop.mapred.RecordWriter localWriter;
         ObjectInspector localObjectInspector;
         SerDe localSerDe;
         OutputJobInfo localJobInfo = null;
 
-        if (dynamicPartitioningUsed){
+        if (dynamicPartitioningUsed) {
             // calculate which writer to use from the remaining values - this needs to be done before we delete cols
             List<String> dynamicPartValues = new ArrayList<String>();
-            for (Integer colToAppend :  dynamicPartCols){
+            for (Integer colToAppend : dynamicPartCols) {
                 dynamicPartValues.add(value.get(colToAppend).toString());
             }
 
             String dynKey = dynamicPartValues.toString();
-            if (!baseDynamicWriters.containsKey(dynKey)){
-                if ((maxDynamicPartitions != -1) && (baseDynamicWriters.size() > maxDynamicPartitions)){
+            if (!baseDynamicWriters.containsKey(dynKey)) {
+                if ((maxDynamicPartitions != -1) && (baseDynamicWriters.size() > maxDynamicPartitions)) {
                     throw new HCatException(ErrorType.ERROR_TOO_MANY_DYNAMIC_PTNS,
-                            "Number of dynamic partitions being created "
-                                    + "exceeds configured max allowable partitions["
-                                    + maxDynamicPartitions
-                                    + "], increase parameter ["
-                                    + HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.varname
-                                    + "] if needed.");
+                        "Number of dynamic partitions being created "
+                            + "exceeds configured max allowable partitions["
+                            + maxDynamicPartitions
+                            + "], increase parameter ["
+                            + HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.varname
+                            + "] if needed.");
                 }
 
                 org.apache.hadoop.mapred.TaskAttemptContext currTaskContext = HCatMapRedUtil.createTaskAttemptContext(context);
                 configureDynamicStorageHandler(currTaskContext, dynamicPartValues);
-                localJobInfo= HCatBaseOutputFormat.getJobInfo(currTaskContext);
+                localJobInfo = HCatBaseOutputFormat.getJobInfo(currTaskContext);
 
                 //setup serDe
                 SerDe currSerDe = ReflectionUtils.newInstance(storageHandler.getSerDeClass(), currTaskContext.getJobConf());
                 try {
                     InternalUtil.initializeOutputSerDe(currSerDe, currTaskContext.getConfiguration(), localJobInfo);
                 } catch (SerDeException e) {
-                    throw new IOException("Failed to initialize SerDe",e);
+                    throw new IOException("Failed to initialize SerDe", e);
                 }
 
                 //create base OutputFormat
                 org.apache.hadoop.mapred.OutputFormat baseOF =
-                        ReflectionUtils.newInstance(storageHandler.getOutputFormatClass(), currTaskContext.getJobConf());
-                //check outputSpecs
-                baseOF.checkOutputSpecs(null,currTaskContext.getJobConf());
+                    ReflectionUtils.newInstance(storageHandler.getOutputFormatClass(), currTaskContext.getJobConf());
                 //get Output Committer
-                org.apache.hadoop.mapred.OutputCommitter baseOutputCommitter =  currTaskContext.getJobConf().getOutputCommitter();
+                org.apache.hadoop.mapred.OutputCommitter baseOutputCommitter = currTaskContext.getJobConf().getOutputCommitter();
                 //create currJobContext the latest so it gets all the config changes
                 org.apache.hadoop.mapred.JobContext currJobContext = HCatMapRedUtil.createJobContext(currTaskContext);
+
+                //We are skipping calling checkOutputSpecs() for each partition
+                //As it can throw a FileAlreadyExistsException when more than one mapper is writing to a partition 
+                //See HCATALOG-490, also to avoid contacting the namenode for each new FileOutputFormat instance
+                //In general this should be ok for most FileOutputFormat implementations
+		//but may become an issue for cases when the method is used to perform other setup tasks
+
                 //setupJob()
                 baseOutputCommitter.setupJob(currJobContext);
                 //recreate to refresh jobConf of currTask context
                 currTaskContext =
-                        HCatMapRedUtil.createTaskAttemptContext(currJobContext.getJobConf(),
-                                                                                        currTaskContext.getTaskAttemptID(),
-                                                                                        currTaskContext.getProgressible());
+                    HCatMapRedUtil.createTaskAttemptContext(currJobContext.getJobConf(),
+                        currTaskContext.getTaskAttemptID(),
+                        currTaskContext.getProgressible());
                 //set temp location
                 currTaskContext.getConfiguration().set("mapred.work.output.dir",
-                                new FileOutputCommitter(new Path(localJobInfo.getLocation()),currTaskContext).getWorkPath().toString());
+                    new FileOutputCommitter(new Path(localJobInfo.getLocation()), currTaskContext).getWorkPath().toString());
                 //setupTask()
                 baseOutputCommitter.setupTask(currTaskContext);
 
                 org.apache.hadoop.mapred.RecordWriter baseRecordWriter =
-                        baseOF.getRecordWriter(null,
-                                                            currTaskContext.getJobConf(),
-                                                            FileOutputFormat.getUniqueFile(currTaskContext, "part", ""),
-                                                            InternalUtil.createReporter(currTaskContext));
+                    baseOF.getRecordWriter(null,
+                        currTaskContext.getJobConf(),
+                        FileOutputFormat.getUniqueFile(currTaskContext, "part", ""),
+                        InternalUtil.createReporter(currTaskContext));
 
                 baseDynamicWriters.put(dynKey, baseRecordWriter);
-                baseDynamicSerDe.put(dynKey,currSerDe);
-                baseDynamicCommitters.put(dynKey,baseOutputCommitter);
-                dynamicContexts.put(dynKey,currTaskContext);
-                dynamicObjectInspectors.put(dynKey,InternalUtil.createStructObjectInspector(jobInfo.getOutputSchema()));
+                baseDynamicSerDe.put(dynKey, currSerDe);
+                baseDynamicCommitters.put(dynKey, baseOutputCommitter);
+                dynamicContexts.put(dynKey, currTaskContext);
+                dynamicObjectInspectors.put(dynKey, InternalUtil.createStructObjectInspector(jobInfo.getOutputSchema()));
                 dynamicOutputJobInfo.put(dynKey, HCatOutputFormat.getJobInfo(dynamicContexts.get(dynKey)));
             }
-            
+
             localJobInfo = dynamicOutputJobInfo.get(dynKey);
             localWriter = baseDynamicWriters.get(dynKey);
             localSerDe = baseDynamicSerDe.get(dynKey);
             localObjectInspector = dynamicObjectInspectors.get(dynKey);
-        }
-        else{
+        } else {
             localJobInfo = jobInfo;
             localWriter = getBaseRecordWriter();
             localSerDe = serDe;
             localObjectInspector = objectInspector;
         }
 
-        for(Integer colToDel : partColsToDel){
+        for (Integer colToDel : partColsToDel) {
             value.remove(colToDel);
         }
 
@@ -251,7 +254,7 @@ class FileRecordWriterContainer extends 
         try {
             localWriter.write(NullWritable.get(), localSerDe.serialize(value.getAll(), localObjectInspector));
         } catch (SerDeException e) {
-            throw new IOException("Failed to serialize object",e);
+            throw new IOException("Failed to serialize object", e);
         }
     }
 

Modified: incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java?rev=1388607&r1=1388606&r2=1388607&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java (original)
+++ incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java Fri Sep 21 18:37:05 2012
@@ -25,328 +25,342 @@ import java.util.List;
 import java.util.Map;
 
 import junit.framework.Assert;
-import junit.framework.TestCase;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.cli.CliSessionState;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.ql.Driver;
 import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
 import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
-import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.hcatalog.HcatTestUtils;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
 import org.apache.hcatalog.data.DefaultHCatRecord;
 import org.apache.hcatalog.data.HCatRecord;
 import org.apache.hcatalog.data.schema.HCatFieldSchema;
 import org.apache.hcatalog.data.schema.HCatSchema;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.junit.Assert.assertTrue;
+
 /**
  * Test for HCatOutputFormat. Writes a partition using HCatOutputFormat and reads
  * it back using HCatInputFormat, checks the column values and counts.
  */
-public abstract class HCatMapReduceTest extends TestCase {
-
-  private static final Logger LOG = LoggerFactory.getLogger(HCatMapReduceTest.class);
-  protected String dbName = "default";
-  protected String tableName = "testHCatMapReduceTable";
-
-  protected String inputFormat = RCFileInputFormat.class.getName();
-  protected String outputFormat = RCFileOutputFormat.class.getName();
-  protected String serdeClass = ColumnarSerDe.class.getName();
-
-  private static List<HCatRecord> writeRecords = new ArrayList<HCatRecord>();
-  private static List<HCatRecord> readRecords = new ArrayList<HCatRecord>();
-
-  protected abstract void initialize() throws Exception;
-
-  protected abstract List<FieldSchema> getPartitionKeys();
-
-  protected abstract List<FieldSchema> getTableColumns();
+public abstract class HCatMapReduceTest extends HCatBaseTest {
 
-  private HiveMetaStoreClient client;
-  protected HiveConf hiveConf;
+    private static final Logger LOG = LoggerFactory.getLogger(HCatMapReduceTest.class);
+    protected static String dbName = MetaStoreUtils.DEFAULT_DATABASE_NAME;
+    protected static String tableName = "testHCatMapReduceTable";
 
-  private FileSystem fs;
-  private String thriftUri = null;
+    protected String inputFormat = RCFileInputFormat.class.getName();
+    protected String outputFormat = RCFileOutputFormat.class.getName();
+    protected String serdeClass = ColumnarSerDe.class.getName();
 
-  protected Driver driver;
+    private static List<HCatRecord> writeRecords = new ArrayList<HCatRecord>();
+    private static List<HCatRecord> readRecords = new ArrayList<HCatRecord>();
 
-  @Override
-  protected void setUp() throws Exception {
-    hiveConf = new HiveConf(this.getClass());
+    protected abstract List<FieldSchema> getPartitionKeys();
 
-    //The default org.apache.hadoop.hive.ql.hooks.PreExecutePrinter hook
-    //is present only in the ql/test directory
-    hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
-    hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
-    hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
-    driver = new Driver(hiveConf);
-    SessionState.start(new CliSessionState(hiveConf));
+    protected abstract List<FieldSchema> getTableColumns();
 
-    thriftUri = System.getenv("HCAT_METASTORE_URI");
+    private static FileSystem fs;
 
-    if( thriftUri != null ) {
-      LOG.info("Using URI {}", thriftUri);
-
-      hiveConf.set("hive.metastore.local", "false");
-      hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, thriftUri);
+    @BeforeClass
+    public static void setUpOneTime() throws Exception {
+        fs = new LocalFileSystem();
+        fs.initialize(fs.getWorkingDirectory().toUri(), new Configuration());
+        MapCreate.writeCount = 0;
+        MapRead.readCount = 0;
     }
 
-    fs = new LocalFileSystem();
-    fs.initialize(fs.getWorkingDirectory().toUri(), new Configuration());
-
-    initialize();
-
-    client = new HiveMetaStoreClient(hiveConf, null);
-    initTable();
-  }
-
-  @Override
-  protected void tearDown() throws Exception {
-    try {
-      String databaseName = (dbName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME : dbName;
+    @After
+    public void deleteTable() throws Exception {
+        try {
+            String databaseName = (dbName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME : dbName;
 
-      client.dropTable(databaseName, tableName);
-    } catch(Exception e) {
-      e.printStackTrace();
-      throw e;
+            client.dropTable(databaseName, tableName);
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw e;
+        }
     }
 
-    client.close();
-  }
-
-
-
-  private void initTable() throws Exception {
-
-    String databaseName = (dbName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME : dbName;
-
-    try {
-      client.dropTable(databaseName, tableName);
-    } catch(Exception e) {
-    } //can fail with NoSuchObjectException
-
-
-    Table tbl = new Table();
-    tbl.setDbName(databaseName);
-    tbl.setTableName(tableName);
-    tbl.setTableType("MANAGED_TABLE");
-    StorageDescriptor sd = new StorageDescriptor();
-
-    sd.setCols(getTableColumns());
-    tbl.setPartitionKeys(getPartitionKeys());
-
-    tbl.setSd(sd);
-
-    sd.setBucketCols(new ArrayList<String>(2));
-    sd.setSerdeInfo(new SerDeInfo());
-    sd.getSerdeInfo().setName(tbl.getTableName());
-    sd.getSerdeInfo().setParameters(new HashMap<String, String>());
-    sd.getSerdeInfo().getParameters().put(
-        org.apache.hadoop.hive.serde.Constants.SERIALIZATION_FORMAT, "1");
-    sd.getSerdeInfo().setSerializationLib(serdeClass);
-    sd.setInputFormat(inputFormat);
-    sd.setOutputFormat(outputFormat);
+    @Before
+    public void disableHiveClientCache() throws IOException, MetaException {
+        HiveConf hiveConf = new HiveConf();
+        hiveConf.setInt(HCatConstants.HCAT_HIVE_CLIENT_EXPIRY_TIME, 0);
+        // Hack to initialize cache with 0 expiry time causing it to return a new hive client every time
+        // Otherwise the cache doesn't play well with the second test method with the client gets closed() in the
+        // tearDown() of the previous test
+        HCatUtil.getHiveClient(hiveConf);
+    }
 
-    Map<String, String> tableParams = new HashMap<String, String>();
-    tbl.setParameters(tableParams);
+    @Before
+    public void createTable() throws Exception {
+        String databaseName = (dbName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME : dbName;
 
-    client.createTable(tbl);
-  }
+        try {
+            client.dropTable(databaseName, tableName);
+        } catch (Exception e) {
+        } //can fail with NoSuchObjectException
+
+
+        Table tbl = new Table();
+        tbl.setDbName(databaseName);
+        tbl.setTableName(tableName);
+        tbl.setTableType("MANAGED_TABLE");
+        StorageDescriptor sd = new StorageDescriptor();
+
+        sd.setCols(getTableColumns());
+        tbl.setPartitionKeys(getPartitionKeys());
+
+        tbl.setSd(sd);
+
+        sd.setBucketCols(new ArrayList<String>(2));
+        sd.setSerdeInfo(new SerDeInfo());
+        sd.getSerdeInfo().setName(tbl.getTableName());
+        sd.getSerdeInfo().setParameters(new HashMap<String, String>());
+        sd.getSerdeInfo().getParameters().put(
+            org.apache.hadoop.hive.serde.Constants.SERIALIZATION_FORMAT, "1");
+        sd.getSerdeInfo().setSerializationLib(serdeClass);
+        sd.setInputFormat(inputFormat);
+        sd.setOutputFormat(outputFormat);
 
-  //Create test input file with specified number of rows
-  private void createInputFile(Path path, int rowCount) throws IOException {
+        Map<String, String> tableParams = new HashMap<String, String>();
+        tbl.setParameters(tableParams);
 
-    if( fs.exists(path) ) {
-      fs.delete(path, true);
+        client.createTable(tbl);
     }
 
-    FSDataOutputStream os = fs.create(path);
+    //Create test input file with specified number of rows
+    private void createInputFile(Path path, int rowCount) throws IOException {
 
-    for(int i = 0;i < rowCount;i++) {
-      os.writeChars(i + "\n");
-    }
+        if (fs.exists(path)) {
+            fs.delete(path, true);
+        }
 
-    os.close();
-  }
+        FSDataOutputStream os = fs.create(path);
 
-  public static class MapCreate extends
-  Mapper<LongWritable, Text, BytesWritable, HCatRecord> {
+        for (int i = 0; i < rowCount; i++) {
+            os.writeChars(i + "\n");
+        }
 
-    static int writeCount = 0; //test will be in local mode
+        os.close();
+    }
 
-    @Override
-    public void map(LongWritable key, Text value, Context context
-    ) throws IOException, InterruptedException {
-      {
-        try {
-          HCatRecord rec = writeRecords.get(writeCount);
-          context.write(null, rec);
-          writeCount++;
+    public static class MapCreate extends
+        Mapper<LongWritable, Text, BytesWritable, HCatRecord> {
 
-        }catch(Exception e) {
+        static int writeCount = 0; //test will be in local mode
 
-          e.printStackTrace(System.err); //print since otherwise exception is lost
-          throw new IOException(e);
+        @Override
+        public void map(LongWritable key, Text value, Context context
+        ) throws IOException, InterruptedException {
+            {
+                try {
+                    HCatRecord rec = writeRecords.get(writeCount);
+                    context.write(null, rec);
+                    writeCount++;
+
+                } catch (Exception e) {
+
+                    e.printStackTrace(System.err); //print since otherwise exception is lost
+                    throw new IOException(e);
+                }
+            }
         }
-      }
     }
-  }
 
-  public static class MapRead extends
-  Mapper<WritableComparable, HCatRecord, BytesWritable, Text> {
+    public static class MapRead extends
+        Mapper<WritableComparable, HCatRecord, BytesWritable, Text> {
 
-    static int readCount = 0; //test will be in local mode
+        static int readCount = 0; //test will be in local mode
 
-    @Override
-    public void map(WritableComparable key, HCatRecord value, Context context
-    ) throws IOException, InterruptedException {
-      {
-        try {
-          readRecords.add(value);
-          readCount++;
-        } catch(Exception e) {
-          e.printStackTrace(); //print since otherwise exception is lost
-          throw new IOException(e);
+        @Override
+        public void map(WritableComparable key, HCatRecord value, Context context
+        ) throws IOException, InterruptedException {
+            {
+                try {
+                    readRecords.add(value);
+                    readCount++;
+                } catch (Exception e) {
+                    e.printStackTrace(); //print since otherwise exception is lost
+                    throw new IOException(e);
+                }
+            }
         }
-      }
     }
-  }
 
-  Job runMRCreate(Map<String, String> partitionValues,
-        List<HCatFieldSchema> partitionColumns, List<HCatRecord> records,
-        int writeCount, boolean assertWrite) throws Exception {
+    Job runMRCreate(Map<String, String> partitionValues,
+                    List<HCatFieldSchema> partitionColumns, List<HCatRecord> records,
+                    int writeCount, boolean assertWrite) throws Exception {
+        return runMRCreate(partitionValues, partitionColumns, records, writeCount, assertWrite, true);
+    }
 
-    writeRecords = records;
-    MapCreate.writeCount = 0;
+    /**
+     * Run a local map reduce job to load data from in memory records to an HCatalog Table
+     * @param partitionValues
+     * @param partitionColumns
+     * @param records data to be written to HCatalog table
+     * @param writeCount
+     * @param assertWrite
+     * @param asSingleMapTask
+     * @return
+     * @throws Exception
+     */
+    Job runMRCreate(Map<String, String> partitionValues,
+                    List<HCatFieldSchema> partitionColumns, List<HCatRecord> records,
+                    int writeCount, boolean assertWrite, boolean asSingleMapTask) throws Exception {
+
+        writeRecords = records;
+        MapCreate.writeCount = 0;
+
+        Configuration conf = new Configuration();
+        Job job = new Job(conf, "hcat mapreduce write test");
+        job.setJarByClass(this.getClass());
+        job.setMapperClass(HCatMapReduceTest.MapCreate.class);
+
+        // input/output settings
+        job.setInputFormatClass(TextInputFormat.class);
+
+        if (asSingleMapTask) {
+            // One input path would mean only one map task
+            Path path = new Path(fs.getWorkingDirectory(), "mapred/testHCatMapReduceInput");
+            createInputFile(path, writeCount);
+            TextInputFormat.setInputPaths(job, path);
+        } else {
+            // Create two input paths so that two map tasks get triggered. There could be other ways
+            // to trigger two map tasks.
+            Path path = new Path(fs.getWorkingDirectory(), "mapred/testHCatMapReduceInput");
+            createInputFile(path, writeCount / 2);
 
-    Configuration conf = new Configuration();
-    Job job = new Job(conf, "hcat mapreduce write test");
-    job.setJarByClass(this.getClass());
-    job.setMapperClass(HCatMapReduceTest.MapCreate.class);
+            Path path2 = new Path(fs.getWorkingDirectory(), "mapred/testHCatMapReduceInput2");
+            createInputFile(path2, (writeCount - writeCount / 2));
 
-    // input/output settings
-    job.setInputFormatClass(TextInputFormat.class);
+            TextInputFormat.setInputPaths(job, path, path2);
+        }
 
-    Path path = new Path(fs.getWorkingDirectory(), "mapred/testHCatMapReduceInput");
-    createInputFile(path, writeCount);
+        job.setOutputFormatClass(HCatOutputFormat.class);
 
-    TextInputFormat.setInputPaths(job, path);
+        OutputJobInfo outputJobInfo = OutputJobInfo.create(dbName, tableName, partitionValues);
+        HCatOutputFormat.setOutput(job, outputJobInfo);
 
-    job.setOutputFormatClass(HCatOutputFormat.class);
+        job.setMapOutputKeyClass(BytesWritable.class);
+        job.setMapOutputValueClass(DefaultHCatRecord.class);
 
-    OutputJobInfo outputJobInfo = OutputJobInfo.create(dbName, tableName, partitionValues);
-    HCatOutputFormat.setOutput(job, outputJobInfo);
+        job.setNumReduceTasks(0);
 
-    job.setMapOutputKeyClass(BytesWritable.class);
-    job.setMapOutputValueClass(DefaultHCatRecord.class);
+        HCatOutputFormat.setSchema(job, new HCatSchema(partitionColumns));
 
-    job.setNumReduceTasks(0);
+        boolean success = job.waitForCompletion(true);
 
-    HCatOutputFormat.setSchema(job, new HCatSchema(partitionColumns));
+        // Ensure counters are set when data has actually been read.
+        if (partitionValues != null) {
+            assertTrue(job.getCounters().getGroup("FileSystemCounters")
+                .findCounter("FILE_BYTES_READ").getValue() > 0);
+        }
 
-    boolean success = job.waitForCompletion(true);
+        if (!HcatTestUtils.isHadoop23()) {
+            // Local mode outputcommitter hook is not invoked in Hadoop 1.x
+            if (success) {
+                new FileOutputCommitterContainer(job, null).commitJob(job);
+            } else {
+                new FileOutputCommitterContainer(job, null).abortJob(job, JobStatus.State.FAILED);
+            }
+        }
+        if (assertWrite) {
+            // we assert only if we expected to assert with this call.
+            Assert.assertEquals(writeCount, MapCreate.writeCount);
+        }
 
-    // Ensure counters are set when data has actually been read.
-    if (partitionValues != null) {
-      assertTrue(job.getCounters().getGroup("FileSystemCounters")
-          .findCounter("FILE_BYTES_READ").getValue() > 0);
+        return job;
     }
 
-    if (!HcatTestUtils.isHadoop23()) {
-        // Local mode outputcommitter hook is not invoked in Hadoop 1.x
-        if (success) {
-            new FileOutputCommitterContainer(job,null).commitJob(job);
-        } else {
-            new FileOutputCommitterContainer(job,null).abortJob(job, JobStatus.State.FAILED);
-        }
+    List<HCatRecord> runMRRead(int readCount) throws Exception {
+        return runMRRead(readCount, null);
     }
-    if (assertWrite){
-      // we assert only if we expected to assert with this call.
-      Assert.assertEquals(writeCount, MapCreate.writeCount);
-    }
-
-    return job;
-  }
-
-  List<HCatRecord> runMRRead(int readCount) throws Exception {
-    return runMRRead(readCount, null);
-  }
-
-  List<HCatRecord> runMRRead(int readCount, String filter) throws Exception {
-
-    MapRead.readCount = 0;
-    readRecords.clear();
-
-    Configuration conf = new Configuration();
-    Job job = new Job(conf, "hcat mapreduce read test");
-    job.setJarByClass(this.getClass());
-    job.setMapperClass(HCatMapReduceTest.MapRead.class);
 
-    // input/output settings
-    job.setInputFormatClass(HCatInputFormat.class);
-    job.setOutputFormatClass(TextOutputFormat.class);
-
-    InputJobInfo inputJobInfo = InputJobInfo.create(dbName,tableName,filter);
-    HCatInputFormat.setInput(job, inputJobInfo);
+    /**
+     * Run a local map reduce job to read records from HCatalog table and verify if the count is as expected
+     * @param readCount
+     * @param filter
+     * @return
+     * @throws Exception
+     */
+    List<HCatRecord> runMRRead(int readCount, String filter) throws Exception {
+
+        MapRead.readCount = 0;
+        readRecords.clear();
+
+        Configuration conf = new Configuration();
+        Job job = new Job(conf, "hcat mapreduce read test");
+        job.setJarByClass(this.getClass());
+        job.setMapperClass(HCatMapReduceTest.MapRead.class);
+
+        // input/output settings
+        job.setInputFormatClass(HCatInputFormat.class);
+        job.setOutputFormatClass(TextOutputFormat.class);
+
+        InputJobInfo inputJobInfo = InputJobInfo.create(dbName, tableName, filter);
+        HCatInputFormat.setInput(job, inputJobInfo);
+
+        job.setMapOutputKeyClass(BytesWritable.class);
+        job.setMapOutputValueClass(Text.class);
+
+        job.setNumReduceTasks(0);
+
+        Path path = new Path(fs.getWorkingDirectory(), "mapred/testHCatMapReduceOutput");
+        if (fs.exists(path)) {
+            fs.delete(path, true);
+        }
 
-    job.setMapOutputKeyClass(BytesWritable.class);
-    job.setMapOutputValueClass(Text.class);
+        TextOutputFormat.setOutputPath(job, path);
 
-    job.setNumReduceTasks(0);
+        job.waitForCompletion(true);
+        Assert.assertEquals(readCount, MapRead.readCount);
 
-    Path path = new Path(fs.getWorkingDirectory(), "mapred/testHCatMapReduceOutput");
-    if( fs.exists(path) ) {
-      fs.delete(path, true);
+        return readRecords;
     }
 
-    TextOutputFormat.setOutputPath(job, path);
-
-    job.waitForCompletion(true);
-    Assert.assertEquals(readCount, MapRead.readCount);
 
-    return readRecords;
-  }
+    protected HCatSchema getTableSchema() throws Exception {
 
+        Configuration conf = new Configuration();
+        Job job = new Job(conf, "hcat mapreduce read schema test");
+        job.setJarByClass(this.getClass());
 
-  protected HCatSchema getTableSchema() throws Exception {
+        // input/output settings
+        job.setInputFormatClass(HCatInputFormat.class);
+        job.setOutputFormatClass(TextOutputFormat.class);
 
-    Configuration conf = new Configuration();
-    Job job = new Job(conf, "hcat mapreduce read schema test");
-    job.setJarByClass(this.getClass());
+        InputJobInfo inputJobInfo = InputJobInfo.create(dbName, tableName, null);
+        HCatInputFormat.setInput(job, inputJobInfo);
 
-    // input/output settings
-    job.setInputFormatClass(HCatInputFormat.class);
-    job.setOutputFormatClass(TextOutputFormat.class);
-
-    InputJobInfo inputJobInfo = InputJobInfo.create(dbName,tableName,null);
-    HCatInputFormat.setInput(job, inputJobInfo);
-
-    return HCatInputFormat.getTableSchema(job);
-  }
+        return HCatInputFormat.getTableSchema(job);
+    }
 
 }
 

Modified: incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java?rev=1388607&r1=1388606&r2=1388607&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java (original)
+++ incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java Fri Sep 21 18:37:05 2012
@@ -34,129 +34,151 @@ import org.apache.hcatalog.data.DefaultH
 import org.apache.hcatalog.data.HCatRecord;
 import org.apache.hcatalog.data.schema.HCatFieldSchema;
 import org.apache.hcatalog.data.schema.HCatSchemaUtils;
+import org.junit.BeforeClass;
+import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertTrue;
+
 public class TestHCatDynamicPartitioned extends HCatMapReduceTest {
 
-  private List<HCatRecord> writeRecords;
-  private List<HCatFieldSchema> dataColumns;
-  private static final Logger LOG = LoggerFactory.getLogger(TestHCatDynamicPartitioned.class);
-
-  @Override
-  protected void initialize() throws Exception {
-
-    tableName = "testHCatDynamicPartitionedTable";
-    generateWriteRecords(20,5,0);
-    generateDataColumns();
-  }
-
-  private void generateDataColumns() throws HCatException {
-    dataColumns = new ArrayList<HCatFieldSchema>();
-    dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", Constants.INT_TYPE_NAME, "")));
-    dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", Constants.STRING_TYPE_NAME, "")));
-    dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("p1", Constants.STRING_TYPE_NAME, "")));
-  }
-
-  private void generateWriteRecords(int max, int mod,int offset) {
-    writeRecords = new ArrayList<HCatRecord>();
-
-    for(int i = 0;i < max;i++) {
-      List<Object> objList = new ArrayList<Object>();
-
-      objList.add(i);
-      objList.add("strvalue" + i);
-      objList.add(String.valueOf((i % mod)+offset));
-      writeRecords.add(new DefaultHCatRecord(objList));
-    }
-  }
-
-  @Override
-  protected List<FieldSchema> getPartitionKeys() {
-    List<FieldSchema> fields = new ArrayList<FieldSchema>();
-    fields.add(new FieldSchema("p1", Constants.STRING_TYPE_NAME, ""));
-    return fields;
-  }
-
-  @Override
-  protected List<FieldSchema> getTableColumns() {
-    List<FieldSchema> fields = new ArrayList<FieldSchema>();
-    fields.add(new FieldSchema("c1", Constants.INT_TYPE_NAME, ""));
-    fields.add(new FieldSchema("c2", Constants.STRING_TYPE_NAME, ""));
-    return fields;
-  }
-
-
-  public void testHCatDynamicPartitionedTable() throws Exception {
-
-    generateWriteRecords(20,5,0);
-    runMRCreate(null, dataColumns, writeRecords, 20,true);
-
-    runMRRead(20);
-
-    //Read with partition filter
-    runMRRead(4, "p1 = \"0\"");
-    runMRRead(8, "p1 = \"1\" or p1 = \"3\"");
-    runMRRead(4, "p1 = \"4\"");
-
-    // read from hive to test
-
-    String query = "select * from " + tableName;
-    int retCode = driver.run(query).getResponseCode();
-
-    if( retCode != 0 ) {
-      throw new Exception("Error " + retCode + " running query " + query);
-    }
-
-    ArrayList<String> res = new ArrayList<String>();
-    driver.getResults(res);
-    assertEquals(20, res.size());
-
-
-    //Test for duplicate publish
-    IOException exc = null;
-    try {
-      generateWriteRecords(20,5,0);
-      Job job = runMRCreate(null, dataColumns, writeRecords, 20,false);
-      if (HcatTestUtils.isHadoop23()) {
-          new FileOutputCommitterContainer(job,null).cleanupJob(job);
-      }
-    } catch(IOException e) {
-      exc = e;
-    }
-
-    assertTrue(exc != null);
-    assertTrue(exc instanceof HCatException);
-    assertTrue( "Got exception of type ["+((HCatException) exc).getErrorType().toString()
-        + "] Expected ERROR_PUBLISHING_PARTITION or ERROR_MOVE_FAILED",
-        (ErrorType.ERROR_PUBLISHING_PARTITION == ((HCatException) exc).getErrorType())
-        || (ErrorType.ERROR_MOVE_FAILED == ((HCatException) exc).getErrorType())
+    private static List<HCatRecord> writeRecords;
+    private static List<HCatFieldSchema> dataColumns;
+    private static final Logger LOG = LoggerFactory.getLogger(TestHCatDynamicPartitioned.class);
+    private static final int NUM_RECORDS = 20;
+    private static final int NUM_PARTITIONS = 5;
+
+    @BeforeClass
+    public static void generateInputData() throws Exception {
+        tableName = "testHCatDynamicPartitionedTable";
+        generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0);
+        generateDataColumns();
+    }
+
+    private static void generateDataColumns() throws HCatException {
+        dataColumns = new ArrayList<HCatFieldSchema>();
+        dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", Constants.INT_TYPE_NAME, "")));
+        dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", Constants.STRING_TYPE_NAME, "")));
+        dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("p1", Constants.STRING_TYPE_NAME, "")));
+    }
+
+    private static void generateWriteRecords(int max, int mod, int offset) {
+        writeRecords = new ArrayList<HCatRecord>();
+
+        for (int i = 0; i < max; i++) {
+            List<Object> objList = new ArrayList<Object>();
+
+            objList.add(i);
+            objList.add("strvalue" + i);
+            objList.add(String.valueOf((i % mod) + offset));
+            writeRecords.add(new DefaultHCatRecord(objList));
+        }
+    }
+
+    @Override
+    protected List<FieldSchema> getPartitionKeys() {
+        List<FieldSchema> fields = new ArrayList<FieldSchema>();
+        fields.add(new FieldSchema("p1", Constants.STRING_TYPE_NAME, ""));
+        return fields;
+    }
+
+    @Override
+    protected List<FieldSchema> getTableColumns() {
+        List<FieldSchema> fields = new ArrayList<FieldSchema>();
+        fields.add(new FieldSchema("c1", Constants.INT_TYPE_NAME, ""));
+        fields.add(new FieldSchema("c2", Constants.STRING_TYPE_NAME, ""));
+        return fields;
+    }
+
+    /**
+     * Run the dynamic partitioning test but with single map task
+     * @throws Exception
+     */
+    @Test
+    public void testHCatDynamicPartitionedTable() throws Exception {
+        runHCatDynamicPartitionedTable(true);
+    }
+
+    /**
+     * Run the dynamic partitioning test but with multiple map task. See HCATALOG-490
+     * @throws Exception
+     */
+    @Test
+    public void testHCatDynamicPartitionedTableMultipleTask() throws Exception {
+        runHCatDynamicPartitionedTable(false);
+    }
+
+    protected void runHCatDynamicPartitionedTable(boolean asSingleMapTask) throws Exception {
+        generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0);
+        runMRCreate(null, dataColumns, writeRecords, NUM_RECORDS, true, asSingleMapTask);
+
+        runMRRead(NUM_RECORDS);
+
+        //Read with partition filter
+        runMRRead(4, "p1 = \"0\"");
+        runMRRead(8, "p1 = \"1\" or p1 = \"3\"");
+        runMRRead(4, "p1 = \"4\"");
+
+        // read from hive to test
+
+        String query = "select * from " + tableName;
+        int retCode = driver.run(query).getResponseCode();
+
+        if (retCode != 0) {
+            throw new Exception("Error " + retCode + " running query " + query);
+        }
+
+        ArrayList<String> res = new ArrayList<String>();
+        driver.getResults(res);
+        assertEquals(NUM_RECORDS, res.size());
+
+
+        //Test for duplicate publish
+        IOException exc = null;
+        try {
+            generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0);
+            Job job = runMRCreate(null, dataColumns, writeRecords, NUM_RECORDS, false);
+            if (HcatTestUtils.isHadoop23()) {
+                new FileOutputCommitterContainer(job, null).cleanupJob(job);
+            }
+        } catch (IOException e) {
+            exc = e;
+        }
+
+        assertTrue(exc != null);
+        assertTrue(exc instanceof HCatException);
+        assertTrue("Got exception of type [" + ((HCatException) exc).getErrorType().toString()
+            + "] Expected ERROR_PUBLISHING_PARTITION or ERROR_MOVE_FAILED",
+            (ErrorType.ERROR_PUBLISHING_PARTITION == ((HCatException) exc).getErrorType())
+                || (ErrorType.ERROR_MOVE_FAILED == ((HCatException) exc).getErrorType())
         );
-  }
+    }
 
-//TODO 1.0 miniCluster is slow this test times out, make it work
+    //TODO 1.0 miniCluster is slow this test times out, make it work
 // renaming test to make test framework skip it
-  public void _testHCatDynamicPartitionMaxPartitions() throws Exception {
-    HiveConf hc = new HiveConf(this.getClass());
+    public void _testHCatDynamicPartitionMaxPartitions() throws Exception {
+        HiveConf hc = new HiveConf(this.getClass());
 
-    int maxParts = hiveConf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS);
-    LOG.info("Max partitions allowed = {}", maxParts);
+        int maxParts = hiveConf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS);
+        LOG.info("Max partitions allowed = {}", maxParts);
 
-    IOException exc = null;
-    try {
-      generateWriteRecords(maxParts+5,maxParts+2,10);
-      runMRCreate(null,dataColumns,writeRecords,maxParts+5,false);
-    } catch(IOException e) {
-      exc = e;
-    }
-
-    if (HCatConstants.HCAT_IS_DYNAMIC_MAX_PTN_CHECK_ENABLED){
-      assertTrue(exc != null);
-      assertTrue(exc instanceof HCatException);
-      assertEquals(ErrorType.ERROR_TOO_MANY_DYNAMIC_PTNS, ((HCatException) exc).getErrorType());
-    }else{
-      assertTrue(exc == null);
-      runMRRead(maxParts+5);
+        IOException exc = null;
+        try {
+            generateWriteRecords(maxParts + 5, maxParts + 2, 10);
+            runMRCreate(null, dataColumns, writeRecords, maxParts + 5, false);
+        } catch (IOException e) {
+            exc = e;
+        }
+
+        if (HCatConstants.HCAT_IS_DYNAMIC_MAX_PTN_CHECK_ENABLED) {
+            assertTrue(exc != null);
+            assertTrue(exc instanceof HCatException);
+            assertEquals(ErrorType.ERROR_TOO_MANY_DYNAMIC_PTNS, ((HCatException) exc).getErrorType());
+        } else {
+            assertTrue(exc == null);
+            runMRRead(maxParts + 5);
+        }
     }
-  }
 }

Modified: incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java?rev=1388607&r1=1388606&r2=1388607&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java (original)
+++ incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java Fri Sep 21 18:37:05 2012
@@ -32,99 +32,105 @@ import org.apache.hcatalog.data.DefaultH
 import org.apache.hcatalog.data.HCatRecord;
 import org.apache.hcatalog.data.schema.HCatFieldSchema;
 import org.apache.hcatalog.data.schema.HCatSchemaUtils;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static junit.framework.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public class TestHCatNonPartitioned extends HCatMapReduceTest {
 
-  private List<HCatRecord> writeRecords;
-  List<HCatFieldSchema> partitionColumns;
+    private static List<HCatRecord> writeRecords;
+    static List<HCatFieldSchema> partitionColumns;
+
+    @BeforeClass
+    public static void oneTimeSetUp() throws Exception {
 
-  @Override
-  protected void initialize() throws HCatException {
+        dbName = null; //test if null dbName works ("default" is used)
+        tableName = "testHCatNonPartitionedTable";
 
-    dbName = null; //test if null dbName works ("default" is used)
-    tableName = "testHCatNonPartitionedTable";
+        writeRecords = new ArrayList<HCatRecord>();
 
-    writeRecords = new ArrayList<HCatRecord>();
+        for (int i = 0; i < 20; i++) {
+            List<Object> objList = new ArrayList<Object>();
 
-    for(int i = 0;i < 20;i++) {
-      List<Object> objList = new ArrayList<Object>();
+            objList.add(i);
+            objList.add("strvalue" + i);
+            writeRecords.add(new DefaultHCatRecord(objList));
+        }
 
-      objList.add(i);
-      objList.add("strvalue" + i);
-      writeRecords.add(new DefaultHCatRecord(objList));
+        partitionColumns = new ArrayList<HCatFieldSchema>();
+        partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", Constants.INT_TYPE_NAME, "")));
+        partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", Constants.STRING_TYPE_NAME, "")));
     }
 
-    partitionColumns = new ArrayList<HCatFieldSchema>();
-    partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", Constants.INT_TYPE_NAME, "")));
-    partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", Constants.STRING_TYPE_NAME, "")));
-  }
-
-  @Override
-  protected List<FieldSchema> getPartitionKeys() {
-    List<FieldSchema> fields = new ArrayList<FieldSchema>();
-    //empty list, non partitioned
-    return fields;
-  }
-
-  @Override
-  protected List<FieldSchema> getTableColumns() {
-    List<FieldSchema> fields = new ArrayList<FieldSchema>();
-    fields.add(new FieldSchema("c1", Constants.INT_TYPE_NAME, ""));
-    fields.add(new FieldSchema("c2", Constants.STRING_TYPE_NAME, ""));
-    return fields;
-  }
-
-
-  public void testHCatNonPartitionedTable() throws Exception {
-
-    Map<String, String> partitionMap = new HashMap<String, String>();
-    runMRCreate(null, partitionColumns, writeRecords, 10,true);
-
-    //Test for duplicate publish
-    IOException exc = null;
-    try {
-      runMRCreate(null,  partitionColumns, writeRecords, 20,true);
-    } catch(IOException e) {
-      exc = e;
+    @Override
+    protected List<FieldSchema> getPartitionKeys() {
+        List<FieldSchema> fields = new ArrayList<FieldSchema>();
+        //empty list, non partitioned
+        return fields;
     }
 
-    assertTrue(exc != null);
-    assertTrue(exc instanceof HCatException);
-    assertEquals(ErrorType.ERROR_NON_EMPTY_TABLE, ((HCatException) exc).getErrorType());
-
-    //Test for publish with invalid partition key name
-    exc = null;
-    partitionMap.clear();
-    partitionMap.put("px", "p1value2");
-
-    try {
-      runMRCreate(partitionMap, partitionColumns, writeRecords, 20,true);
-    } catch(IOException e) {
-      exc = e;
+    @Override
+    protected List<FieldSchema> getTableColumns() {
+        List<FieldSchema> fields = new ArrayList<FieldSchema>();
+        fields.add(new FieldSchema("c1", Constants.INT_TYPE_NAME, ""));
+        fields.add(new FieldSchema("c2", Constants.STRING_TYPE_NAME, ""));
+        return fields;
     }
 
-    assertTrue(exc != null);
-    assertTrue(exc instanceof HCatException);
-    assertEquals(ErrorType.ERROR_INVALID_PARTITION_VALUES, ((HCatException) exc).getErrorType());
 
-    //Read should get 10 rows
-    runMRRead(10);
+    @Test
+    public void testHCatNonPartitionedTable() throws Exception {
 
-    hiveReadTest();
-  }
+        Map<String, String> partitionMap = new HashMap<String, String>();
+        runMRCreate(null, partitionColumns, writeRecords, 10, true);
 
-  //Test that data inserted through hcatoutputformat is readable from hive
-  private void hiveReadTest() throws Exception {
+        //Test for duplicate publish
+        IOException exc = null;
+        try {
+            runMRCreate(null, partitionColumns, writeRecords, 20, true);
+        } catch (IOException e) {
+            exc = e;
+        }
+
+        assertTrue(exc != null);
+        assertTrue(exc instanceof HCatException);
+        assertEquals(ErrorType.ERROR_NON_EMPTY_TABLE, ((HCatException) exc).getErrorType());
+
+        //Test for publish with invalid partition key name
+        exc = null;
+        partitionMap.clear();
+        partitionMap.put("px", "p1value2");
+
+        try {
+            runMRCreate(partitionMap, partitionColumns, writeRecords, 20, true);
+        } catch (IOException e) {
+            exc = e;
+        }
+
+        assertTrue(exc != null);
+        assertTrue(exc instanceof HCatException);
+        assertEquals(ErrorType.ERROR_INVALID_PARTITION_VALUES, ((HCatException) exc).getErrorType());
 
-    String query = "select * from " + tableName;
-    int retCode = driver.run(query).getResponseCode();
+        //Read should get 10 rows
+        runMRRead(10);
 
-    if( retCode != 0 ) {
-      throw new Exception("Error " + retCode + " running query " + query);
+        hiveReadTest();
     }
 
-    ArrayList<String> res = new ArrayList<String>();
-    driver.getResults(res);
-    assertEquals(10, res.size());
-  }
+    //Test that data inserted through hcatoutputformat is readable from hive
+    private void hiveReadTest() throws Exception {
+
+        String query = "select * from " + tableName;
+        int retCode = driver.run(query).getResponseCode();
+
+        if (retCode != 0) {
+            throw new Exception("Error " + retCode + " running query " + query);
+        }
+
+        ArrayList<String> res = new ArrayList<String>();
+        driver.getResults(res);
+        assertEquals(10, res.size());
+    }
 }

Modified: incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java?rev=1388607&r1=1388606&r2=1388607&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java (original)
+++ incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java Fri Sep 21 18:37:05 2012
@@ -33,312 +33,318 @@ import org.apache.hcatalog.data.HCatReco
 import org.apache.hcatalog.data.schema.HCatFieldSchema;
 import org.apache.hcatalog.data.schema.HCatSchema;
 import org.apache.hcatalog.data.schema.HCatSchemaUtils;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static junit.framework.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public class TestHCatPartitioned extends HCatMapReduceTest {
 
-  private List<HCatRecord> writeRecords;
-  private List<HCatFieldSchema> partitionColumns;
+    private static List<HCatRecord> writeRecords;
+    private static List<HCatFieldSchema> partitionColumns;
 
-  @Override
-  protected void initialize() throws Exception {
+    @BeforeClass
+    public static void oneTimeSetUp() throws Exception {
 
-    tableName = "testHCatPartitionedTable";
-    writeRecords = new ArrayList<HCatRecord>();
+        tableName = "testHCatPartitionedTable";
+        writeRecords = new ArrayList<HCatRecord>();
 
-    for(int i = 0;i < 20;i++) {
-      List<Object> objList = new ArrayList<Object>();
+        for (int i = 0; i < 20; i++) {
+            List<Object> objList = new ArrayList<Object>();
 
-      objList.add(i);
-      objList.add("strvalue" + i);
-      writeRecords.add(new DefaultHCatRecord(objList));
-    }
+            objList.add(i);
+            objList.add("strvalue" + i);
+            writeRecords.add(new DefaultHCatRecord(objList));
+        }
 
-    partitionColumns = new ArrayList<HCatFieldSchema>();
-    partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", Constants.INT_TYPE_NAME, "")));
-    partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", Constants.STRING_TYPE_NAME, "")));
-  }
-
-
-  @Override
-  protected List<FieldSchema> getPartitionKeys() {
-    List<FieldSchema> fields = new ArrayList<FieldSchema>();
-    //Defining partition names in unsorted order
-    fields.add(new FieldSchema("PaRT1", Constants.STRING_TYPE_NAME, ""));
-    fields.add(new FieldSchema("part0", Constants.STRING_TYPE_NAME, ""));
-    return fields;
-  }
-
-  @Override
-  protected List<FieldSchema> getTableColumns() {
-    List<FieldSchema> fields = new ArrayList<FieldSchema>();
-    fields.add(new FieldSchema("c1", Constants.INT_TYPE_NAME, ""));
-    fields.add(new FieldSchema("c2", Constants.STRING_TYPE_NAME, ""));
-    return fields;
-  }
-
-
-  public void testHCatPartitionedTable() throws Exception {
-
-    Map<String, String> partitionMap = new HashMap<String, String>();
-    partitionMap.put("part1", "p1value1");
-    partitionMap.put("part0", "p0value1");
-
-    runMRCreate(partitionMap, partitionColumns, writeRecords, 10,true);
-
-    partitionMap.clear();
-    partitionMap.put("PART1", "p1value2");
-    partitionMap.put("PART0", "p0value2");
-
-    runMRCreate(partitionMap, partitionColumns, writeRecords, 20,true);
-
-    //Test for duplicate publish
-    IOException exc = null;
-    try {
-      runMRCreate(partitionMap, partitionColumns, writeRecords, 20,true);
-    } catch(IOException e) {
-      exc = e;
+        partitionColumns = new ArrayList<HCatFieldSchema>();
+        partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", Constants.INT_TYPE_NAME, "")));
+        partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", Constants.STRING_TYPE_NAME, "")));
     }
 
-    assertTrue(exc != null);
-    assertTrue(exc instanceof HCatException);
-    assertEquals(ErrorType.ERROR_DUPLICATE_PARTITION, ((HCatException) exc).getErrorType());
-
-    //Test for publish with invalid partition key name
-    exc = null;
-    partitionMap.clear();
-    partitionMap.put("px1", "p1value2");
-    partitionMap.put("px0", "p0value2");
-
-    try {
-      runMRCreate(partitionMap, partitionColumns, writeRecords, 20,true);
-    } catch(IOException e) {
-      exc = e;
+
+    @Override
+    protected List<FieldSchema> getPartitionKeys() {
+        List<FieldSchema> fields = new ArrayList<FieldSchema>();
+        //Defining partition names in unsorted order
+        fields.add(new FieldSchema("PaRT1", Constants.STRING_TYPE_NAME, ""));
+        fields.add(new FieldSchema("part0", Constants.STRING_TYPE_NAME, ""));
+        return fields;
     }
 
-    assertTrue(exc != null);
-    assertTrue(exc instanceof HCatException);
-    assertEquals(ErrorType.ERROR_MISSING_PARTITION_KEY, ((HCatException) exc).getErrorType());
-
-    //Test for publish with missing partition key values
-    exc = null;
-    partitionMap.clear();
-    partitionMap.put("px", "p1value2");
-
-    try {
-      runMRCreate(partitionMap, partitionColumns, writeRecords, 20,true);
-    } catch(IOException e) {
-      exc = e;
+    @Override
+    protected List<FieldSchema> getTableColumns() {
+        List<FieldSchema> fields = new ArrayList<FieldSchema>();
+        fields.add(new FieldSchema("c1", Constants.INT_TYPE_NAME, ""));
+        fields.add(new FieldSchema("c2", Constants.STRING_TYPE_NAME, ""));
+        return fields;
     }
 
-    assertTrue(exc != null);
-    assertTrue(exc instanceof HCatException);
-    assertEquals(ErrorType.ERROR_INVALID_PARTITION_VALUES, ((HCatException) exc).getErrorType());
 
+    @Test
+    public void testHCatPartitionedTable() throws Exception {
 
-    //Test for null partition value map
-    exc = null;
-    try {
-      runMRCreate(null, partitionColumns, writeRecords, 20,false);
-    } catch(IOException e) {
-      exc = e;
-    }
+        Map<String, String> partitionMap = new HashMap<String, String>();
+        partitionMap.put("part1", "p1value1");
+        partitionMap.put("part0", "p0value1");
+
+        runMRCreate(partitionMap, partitionColumns, writeRecords, 10, true);
+
+        partitionMap.clear();
+        partitionMap.put("PART1", "p1value2");
+        partitionMap.put("PART0", "p0value2");
+
+        runMRCreate(partitionMap, partitionColumns, writeRecords, 20, true);
+
+        //Test for duplicate publish
+        IOException exc = null;
+        try {
+            runMRCreate(partitionMap, partitionColumns, writeRecords, 20, true);
+        } catch (IOException e) {
+            exc = e;
+        }
+
+        assertTrue(exc != null);
+        assertTrue(exc instanceof HCatException);
+        assertEquals(ErrorType.ERROR_DUPLICATE_PARTITION, ((HCatException) exc).getErrorType());
 
-    assertTrue(exc == null);
+        //Test for publish with invalid partition key name
+        exc = null;
+        partitionMap.clear();
+        partitionMap.put("px1", "p1value2");
+        partitionMap.put("px0", "p0value2");
+
+        try {
+            runMRCreate(partitionMap, partitionColumns, writeRecords, 20, true);
+        } catch (IOException e) {
+            exc = e;
+        }
+
+        assertTrue(exc != null);
+        assertTrue(exc instanceof HCatException);
+        assertEquals(ErrorType.ERROR_MISSING_PARTITION_KEY, ((HCatException) exc).getErrorType());
+
+        //Test for publish with missing partition key values
+        exc = null;
+        partitionMap.clear();
+        partitionMap.put("px", "p1value2");
+
+        try {
+            runMRCreate(partitionMap, partitionColumns, writeRecords, 20, true);
+        } catch (IOException e) {
+            exc = e;
+        }
+
+        assertTrue(exc != null);
+        assertTrue(exc instanceof HCatException);
+        assertEquals(ErrorType.ERROR_INVALID_PARTITION_VALUES, ((HCatException) exc).getErrorType());
+
+
+        //Test for null partition value map
+        exc = null;
+        try {
+            runMRCreate(null, partitionColumns, writeRecords, 20, false);
+        } catch (IOException e) {
+            exc = e;
+        }
+
+        assertTrue(exc == null);
 //    assertTrue(exc instanceof HCatException);
 //    assertEquals(ErrorType.ERROR_PUBLISHING_PARTITION, ((HCatException) exc).getErrorType());
-    // With Dynamic partitioning, this isn't an error that the keyValues specified didn't values
+        // With Dynamic partitioning, this isn't an error that the keyValues specified didn't values
 
-    //Read should get 10 + 20 rows
-    runMRRead(30);
+        //Read should get 10 + 20 rows
+        runMRRead(30);
 
-    //Read with partition filter
-    runMRRead(10, "part1 = \"p1value1\"");
-    runMRRead(20, "part1 = \"p1value2\"");
-    runMRRead(30, "part1 = \"p1value1\" or part1 = \"p1value2\"");
-    runMRRead(10, "part0 = \"p0value1\"");
-    runMRRead(20, "part0 = \"p0value2\"");
-    runMRRead(30, "part0 = \"p0value1\" or part0 = \"p0value2\"");
+        //Read with partition filter
+        runMRRead(10, "part1 = \"p1value1\"");
+        runMRRead(20, "part1 = \"p1value2\"");
+        runMRRead(30, "part1 = \"p1value1\" or part1 = \"p1value2\"");
+        runMRRead(10, "part0 = \"p0value1\"");
+        runMRRead(20, "part0 = \"p0value2\"");
+        runMRRead(30, "part0 = \"p0value1\" or part0 = \"p0value2\"");
 
-    tableSchemaTest();
-    columnOrderChangeTest();
-    hiveReadTest();
-  }
+        tableSchemaTest();
+        columnOrderChangeTest();
+        hiveReadTest();
+    }
 
 
-  //test that new columns gets added to table schema
-  private void tableSchemaTest() throws Exception {
+    //test that new columns gets added to table schema
+    private void tableSchemaTest() throws Exception {
 
-    HCatSchema tableSchema = getTableSchema();
+        HCatSchema tableSchema = getTableSchema();
 
-    assertEquals(4, tableSchema.getFields().size());
+        assertEquals(4, tableSchema.getFields().size());
 
-    //Update partition schema to have 3 fields
-    partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c3", Constants.STRING_TYPE_NAME, "")));
+        //Update partition schema to have 3 fields
+        partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c3", Constants.STRING_TYPE_NAME, "")));
 
-    writeRecords = new ArrayList<HCatRecord>();
+        writeRecords = new ArrayList<HCatRecord>();
 
-    for(int i = 0;i < 20;i++) {
-      List<Object> objList = new ArrayList<Object>();
+        for (int i = 0; i < 20; i++) {
+            List<Object> objList = new ArrayList<Object>();
 
-      objList.add(i);
-      objList.add("strvalue" + i);
-      objList.add("str2value" + i);
+            objList.add(i);
+            objList.add("strvalue" + i);
+            objList.add("str2value" + i);
 
-      writeRecords.add(new DefaultHCatRecord(objList));
-    }
+            writeRecords.add(new DefaultHCatRecord(objList));
+        }
 
-    Map<String, String> partitionMap = new HashMap<String, String>();
-    partitionMap.put("part1", "p1value5");
-    partitionMap.put("part0", "p0value5");
-
-    runMRCreate(partitionMap, partitionColumns, writeRecords, 10,true);
-
-    tableSchema = getTableSchema();
-
-    //assert that c3 has got added to table schema
-    assertEquals(5, tableSchema.getFields().size());
-    assertEquals("c1", tableSchema.getFields().get(0).getName());
-    assertEquals("c2", tableSchema.getFields().get(1).getName());
-    assertEquals("c3", tableSchema.getFields().get(2).getName());
-    assertEquals("part1", tableSchema.getFields().get(3).getName());
-    assertEquals("part0", tableSchema.getFields().get(4).getName());
-
-    //Test that changing column data type fails
-    partitionMap.clear();
-    partitionMap.put("part1", "p1value6");
-    partitionMap.put("part0", "p0value6");
-
-    partitionColumns = new ArrayList<HCatFieldSchema>();
-    partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", Constants.INT_TYPE_NAME, "")));
-    partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", Constants.INT_TYPE_NAME, "")));
-
-    IOException exc = null;
-    try {
-      runMRCreate(partitionMap, partitionColumns, writeRecords, 20,true);
-    } catch(IOException e) {
-      exc = e;
-    }
+        Map<String, String> partitionMap = new HashMap<String, String>();
+        partitionMap.put("part1", "p1value5");
+        partitionMap.put("part0", "p0value5");
 
-    assertTrue(exc != null);
-    assertTrue(exc instanceof HCatException);
-    assertEquals(ErrorType.ERROR_SCHEMA_TYPE_MISMATCH, ((HCatException) exc).getErrorType());
-
-    //Test that partition key is not allowed in data
-    partitionColumns = new ArrayList<HCatFieldSchema>();
-    partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", Constants.INT_TYPE_NAME, "")));
-    partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", Constants.STRING_TYPE_NAME, "")));
-    partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c3", Constants.STRING_TYPE_NAME, "")));
-    partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("part1", Constants.STRING_TYPE_NAME, "")));
-
-    List<HCatRecord> recordsContainingPartitionCols = new ArrayList<HCatRecord>(20);
-    for(int i = 0;i < 20;i++) {
-      List<Object> objList = new ArrayList<Object>();
-
-      objList.add(i);
-      objList.add("c2value" + i);
-      objList.add("c3value" + i);
-      objList.add("p1value6");
+        runMRCreate(partitionMap, partitionColumns, writeRecords, 10, true);
 
-      recordsContainingPartitionCols.add(new DefaultHCatRecord(objList));
-    }
+        tableSchema = getTableSchema();
 
-    exc = null;
-    try {
-      runMRCreate(partitionMap, partitionColumns, recordsContainingPartitionCols, 20,true);
-    } catch(IOException e) {
-      exc = e;
-    }
+        //assert that c3 has got added to table schema
+        assertEquals(5, tableSchema.getFields().size());
+        assertEquals("c1", tableSchema.getFields().get(0).getName());
+        assertEquals("c2", tableSchema.getFields().get(1).getName());
+        assertEquals("c3", tableSchema.getFields().get(2).getName());
+        assertEquals("part1", tableSchema.getFields().get(3).getName());
+        assertEquals("part0", tableSchema.getFields().get(4).getName());
 
-    List<HCatRecord> records= runMRRead(20,"part1 = \"p1value6\"");
-    assertEquals(20, records.size());
-    records= runMRRead(20,"part0 = \"p0value6\"");
-    assertEquals(20, records.size());
-    Integer i =0;
-    for(HCatRecord rec : records){
-      assertEquals(5, rec.size());
-      assertTrue(rec.get(0).equals(i));
-      assertTrue(rec.get(1).equals("c2value"+i));
-      assertTrue(rec.get(2).equals("c3value"+i));
-      assertTrue(rec.get(3).equals("p1value6"));
-      assertTrue(rec.get(4).equals("p0value6"));
-      i++;
-    }
-  }
+        //Test that changing column data type fails
+        partitionMap.clear();
+        partitionMap.put("part1", "p1value6");
+        partitionMap.put("part0", "p0value6");
 
-  //check behavior while change the order of columns
-  private void columnOrderChangeTest() throws Exception {
+        partitionColumns = new ArrayList<HCatFieldSchema>();
+        partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", Constants.INT_TYPE_NAME, "")));
+        partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", Constants.INT_TYPE_NAME, "")));
 
-    HCatSchema tableSchema = getTableSchema();
+        IOException exc = null;
+        try {
+            runMRCreate(partitionMap, partitionColumns, writeRecords, 20, true);
+        } catch (IOException e) {
+            exc = e;
+        }
 
-    assertEquals(5, tableSchema.getFields().size());
+        assertTrue(exc != null);
+        assertTrue(exc instanceof HCatException);
+        assertEquals(ErrorType.ERROR_SCHEMA_TYPE_MISMATCH, ((HCatException) exc).getErrorType());
 
-    partitionColumns = new ArrayList<HCatFieldSchema>();
-    partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", Constants.INT_TYPE_NAME, "")));
-    partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c3", Constants.STRING_TYPE_NAME, "")));
-    partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", Constants.STRING_TYPE_NAME, "")));
+        //Test that partition key is not allowed in data
+        partitionColumns = new ArrayList<HCatFieldSchema>();
+        partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", Constants.INT_TYPE_NAME, "")));
+        partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", Constants.STRING_TYPE_NAME, "")));
+        partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c3", Constants.STRING_TYPE_NAME, "")));
+        partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("part1", Constants.STRING_TYPE_NAME, "")));
 
+        List<HCatRecord> recordsContainingPartitionCols = new ArrayList<HCatRecord>(20);
+        for (int i = 0; i < 20; i++) {
+            List<Object> objList = new ArrayList<Object>();
 
-    writeRecords = new ArrayList<HCatRecord>();
+            objList.add(i);
+            objList.add("c2value" + i);
+            objList.add("c3value" + i);
+            objList.add("p1value6");
 
-    for(int i = 0;i < 10;i++) {
-      List<Object> objList = new ArrayList<Object>();
+            recordsContainingPartitionCols.add(new DefaultHCatRecord(objList));
+        }
 
-      objList.add(i);
-      objList.add("co strvalue" + i);
-      objList.add("co str2value" + i);
+        exc = null;
+        try {
+            runMRCreate(partitionMap, partitionColumns, recordsContainingPartitionCols, 20, true);
+        } catch (IOException e) {
+            exc = e;
+        }
 
-      writeRecords.add(new DefaultHCatRecord(objList));
+        List<HCatRecord> records = runMRRead(20, "part1 = \"p1value6\"");
+        assertEquals(20, records.size());
+        records = runMRRead(20, "part0 = \"p0value6\"");
+        assertEquals(20, records.size());
+        Integer i = 0;
+        for (HCatRecord rec : records) {
+            assertEquals(5, rec.size());
+            assertTrue(rec.get(0).equals(i));
+            assertTrue(rec.get(1).equals("c2value" + i));
+            assertTrue(rec.get(2).equals("c3value" + i));
+            assertTrue(rec.get(3).equals("p1value6"));
+            assertTrue(rec.get(4).equals("p0value6"));
+            i++;
+        }
     }
 
-    Map<String, String> partitionMap = new HashMap<String, String>();
-    partitionMap.put("part1", "p1value8");
-    partitionMap.put("part0", "p0value8");
-
-    Exception exc = null;
-    try {
-      runMRCreate(partitionMap, partitionColumns, writeRecords, 10,true);
-    } catch(IOException e) {
-      exc = e;
-    }
+    //check behavior while change the order of columns
+    private void columnOrderChangeTest() throws Exception {
 
-    assertTrue(exc != null);
-    assertTrue(exc instanceof HCatException);
-    assertEquals(ErrorType.ERROR_SCHEMA_COLUMN_MISMATCH, ((HCatException) exc).getErrorType());
+        HCatSchema tableSchema = getTableSchema();
 
+        assertEquals(5, tableSchema.getFields().size());
 
-    partitionColumns = new ArrayList<HCatFieldSchema>();
-    partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", Constants.INT_TYPE_NAME, "")));
-    partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", Constants.STRING_TYPE_NAME, "")));
+        partitionColumns = new ArrayList<HCatFieldSchema>();
+        partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", Constants.INT_TYPE_NAME, "")));
+        partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c3", Constants.STRING_TYPE_NAME, "")));
+        partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", Constants.STRING_TYPE_NAME, "")));
 
-    writeRecords = new ArrayList<HCatRecord>();
 
-    for(int i = 0;i < 10;i++) {
-      List<Object> objList = new ArrayList<Object>();
+        writeRecords = new ArrayList<HCatRecord>();
 
-      objList.add(i);
-      objList.add("co strvalue" + i);
+        for (int i = 0; i < 10; i++) {
+            List<Object> objList = new ArrayList<Object>();
 
-      writeRecords.add(new DefaultHCatRecord(objList));
-    }
+            objList.add(i);
+            objList.add("co strvalue" + i);
+            objList.add("co str2value" + i);
+
+            writeRecords.add(new DefaultHCatRecord(objList));
+        }
+
+        Map<String, String> partitionMap = new HashMap<String, String>();
+        partitionMap.put("part1", "p1value8");
+        partitionMap.put("part0", "p0value8");
 
-    runMRCreate(partitionMap, partitionColumns, writeRecords, 10,true);
+        Exception exc = null;
+        try {
+            runMRCreate(partitionMap, partitionColumns, writeRecords, 10, true);
+        } catch (IOException e) {
+            exc = e;
+        }
 
-    //Read should get 10 + 20 + 10 + 10 + 20 rows
-    runMRRead(70);
-  }
+        assertTrue(exc != null);
+        assertTrue(exc instanceof HCatException);
+        assertEquals(ErrorType.ERROR_SCHEMA_COLUMN_MISMATCH, ((HCatException) exc).getErrorType());
 
-  //Test that data inserted through hcatoutputformat is readable from hive
-  private void hiveReadTest() throws Exception {
 
-    String query = "select * from " + tableName;
-    int retCode = driver.run(query).getResponseCode();
+        partitionColumns = new ArrayList<HCatFieldSchema>();
+        partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", Constants.INT_TYPE_NAME, "")));
+        partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", Constants.STRING_TYPE_NAME, "")));
 
-    if( retCode != 0 ) {
-      throw new Exception("Error " + retCode + " running query " + query);
+        writeRecords = new ArrayList<HCatRecord>();
+
+        for (int i = 0; i < 10; i++) {
+            List<Object> objList = new ArrayList<Object>();
+
+            objList.add(i);
+            objList.add("co strvalue" + i);
+
+            writeRecords.add(new DefaultHCatRecord(objList));
+        }
+
+        runMRCreate(partitionMap, partitionColumns, writeRecords, 10, true);
+
+        //Read should get 10 + 20 + 10 + 10 + 20 rows
+        runMRRead(70);
     }
 
-    ArrayList<String> res = new ArrayList<String>();
-    driver.getResults(res);
-    assertEquals(70, res.size());
-  }
+    //Test that data inserted through hcatoutputformat is readable from hive
+    private void hiveReadTest() throws Exception {
+
+        String query = "select * from " + tableName;
+        int retCode = driver.run(query).getResponseCode();
+
+        if (retCode != 0) {
+            throw new Exception("Error " + retCode + " running query " + query);
+        }
+
+        ArrayList<String> res = new ArrayList<String>();
+        driver.getResults(res);
+        assertEquals(70, res.size());
+    }
 }