You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by to...@apache.org on 2012/09/21 18:37:05 UTC
svn commit: r1388607 - in /incubator/hcatalog/branches/branch-0.4: ./
src/java/org/apache/hcatalog/mapreduce/
src/test/org/apache/hcatalog/mapreduce/
Author: toffer
Date: Fri Sep 21 18:37:05 2012
New Revision: 1388607
URL: http://svn.apache.org/viewvc?rev=1388607&view=rev
Log:
merged from trunk: HCAT-490 HCatStorer() throws error when the same partition key is present in records in more than one tasks running as part of the same job (amalakar via toffer)
Modified:
incubator/hcatalog/branches/branch-0.4/CHANGES.txt
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java
incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java
incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java
incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java
incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java
Modified: incubator/hcatalog/branches/branch-0.4/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/CHANGES.txt?rev=1388607&r1=1388606&r2=1388607&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/CHANGES.txt (original)
+++ incubator/hcatalog/branches/branch-0.4/CHANGES.txt Fri Sep 21 18:37:05 2012
@@ -63,6 +63,8 @@ Trunk (unreleased changes)
OPTIMIZATIONS
BUG FIXES
+ HCAT-490 HCatStorer() throws error when the same partition key is present in records in more than one tasks running as part of the same job (amalakar via toffer)
+
HCAT-494 MultiOutputFormat in 0.23 fails to setAliasConf() correctly. (mithun via toffer)
HCAT-507 e2e harness failing on 0.23 (toffer)
Modified: incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java?rev=1388607&r1=1388606&r2=1388607&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java (original)
+++ incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java Fri Sep 21 18:37:05 2012
@@ -82,17 +82,17 @@ class FileRecordWriterContainer extends
*/
public FileRecordWriterContainer(org.apache.hadoop.mapred.RecordWriter<? super WritableComparable<?>, ? super Writable> baseWriter,
TaskAttemptContext context) throws IOException, InterruptedException {
- super(context,baseWriter);
+ super(context, baseWriter);
this.context = context;
jobInfo = HCatOutputFormat.getJobInfo(context);
storageHandler = HCatUtil.getStorageHandler(context.getConfiguration(), jobInfo.getTableInfo().getStorerInfo());
- serDe = ReflectionUtils.newInstance(storageHandler.getSerDeClass(),context.getConfiguration());
+ serDe = ReflectionUtils.newInstance(storageHandler.getSerDeClass(), context.getConfiguration());
objectInspector = InternalUtil.createStructObjectInspector(jobInfo.getOutputSchema());
try {
InternalUtil.initializeOutputSerDe(serDe, context.getConfiguration(), jobInfo);
} catch (SerDeException e) {
- throw new IOException("Failed to inialize SerDe",e);
+ throw new IOException("Failed to inialize SerDe", e);
}
// If partition columns occur in data, we want to remove them.
@@ -101,9 +101,9 @@ class FileRecordWriterContainer extends
dynamicPartCols = jobInfo.getPosOfDynPartCols();
maxDynamicPartitions = jobInfo.getMaxDynamicPartitions();
- if((partColsToDel == null) || (dynamicPartitioningUsed && (dynamicPartCols == null))){
+ if ((partColsToDel == null) || (dynamicPartitioningUsed && (dynamicPartCols == null))) {
throw new HCatException("It seems that setSchema() is not called on " +
- "HCatOutputFormat. Please make sure that method is called.");
+ "HCatOutputFormat. Please make sure that method is called.");
}
@@ -114,10 +114,9 @@ class FileRecordWriterContainer extends
this.dynamicContexts = null;
this.dynamicObjectInspectors = null;
this.dynamicOutputJobInfo = null;
- }
- else {
- this.baseDynamicSerDe = new HashMap<String,SerDe>();
- this.baseDynamicWriters = new HashMap<String,org.apache.hadoop.mapred.RecordWriter<? super WritableComparable<?>, ? super Writable>>();
+ } else {
+ this.baseDynamicSerDe = new HashMap<String, SerDe>();
+ this.baseDynamicWriters = new HashMap<String, org.apache.hadoop.mapred.RecordWriter<? super WritableComparable<?>, ? super Writable>>();
this.baseDynamicCommitters = new HashMap<String, org.apache.hadoop.mapred.OutputCommitter>();
this.dynamicContexts = new HashMap<String, org.apache.hadoop.mapred.TaskAttemptContext>();
this.dynamicObjectInspectors = new HashMap<String, ObjectInspector>();
@@ -134,17 +133,17 @@ class FileRecordWriterContainer extends
@Override
public void close(TaskAttemptContext context) throws IOException,
- InterruptedException {
+ InterruptedException {
Reporter reporter = InternalUtil.createReporter(context);
- if (dynamicPartitioningUsed){
- for (org.apache.hadoop.mapred.RecordWriter<? super WritableComparable<?>, ? super Writable> bwriter : baseDynamicWriters.values()){
+ if (dynamicPartitioningUsed) {
+ for (org.apache.hadoop.mapred.RecordWriter<? super WritableComparable<?>, ? super Writable> bwriter : baseDynamicWriters.values()) {
//We are in RecordWriter.close() make sense that the context would be TaskInputOutput
bwriter.close(reporter);
}
- for(Map.Entry<String,org.apache.hadoop.mapred.OutputCommitter>entry : baseDynamicCommitters.entrySet()) {
+ for (Map.Entry<String, org.apache.hadoop.mapred.OutputCommitter> entry : baseDynamicCommitters.entrySet()) {
org.apache.hadoop.mapred.TaskAttemptContext currContext = dynamicContexts.get(entry.getKey());
OutputCommitter baseOutputCommitter = entry.getValue();
- if (baseOutputCommitter.needsTaskCommit(currContext)){
+ if (baseOutputCommitter.needsTaskCommit(currContext)) {
baseOutputCommitter.commitTask(currContext);
}
org.apache.hadoop.mapred.JobContext currJobContext = HCatMapRedUtil.createJobContext(currContext);
@@ -157,92 +156,96 @@ class FileRecordWriterContainer extends
@Override
public void write(WritableComparable<?> key, HCatRecord value) throws IOException,
- InterruptedException {
+ InterruptedException {
org.apache.hadoop.mapred.RecordWriter localWriter;
ObjectInspector localObjectInspector;
SerDe localSerDe;
OutputJobInfo localJobInfo = null;
- if (dynamicPartitioningUsed){
+ if (dynamicPartitioningUsed) {
// calculate which writer to use from the remaining values - this needs to be done before we delete cols
List<String> dynamicPartValues = new ArrayList<String>();
- for (Integer colToAppend : dynamicPartCols){
+ for (Integer colToAppend : dynamicPartCols) {
dynamicPartValues.add(value.get(colToAppend).toString());
}
String dynKey = dynamicPartValues.toString();
- if (!baseDynamicWriters.containsKey(dynKey)){
- if ((maxDynamicPartitions != -1) && (baseDynamicWriters.size() > maxDynamicPartitions)){
+ if (!baseDynamicWriters.containsKey(dynKey)) {
+ if ((maxDynamicPartitions != -1) && (baseDynamicWriters.size() > maxDynamicPartitions)) {
throw new HCatException(ErrorType.ERROR_TOO_MANY_DYNAMIC_PTNS,
- "Number of dynamic partitions being created "
- + "exceeds configured max allowable partitions["
- + maxDynamicPartitions
- + "], increase parameter ["
- + HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.varname
- + "] if needed.");
+ "Number of dynamic partitions being created "
+ + "exceeds configured max allowable partitions["
+ + maxDynamicPartitions
+ + "], increase parameter ["
+ + HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.varname
+ + "] if needed.");
}
org.apache.hadoop.mapred.TaskAttemptContext currTaskContext = HCatMapRedUtil.createTaskAttemptContext(context);
configureDynamicStorageHandler(currTaskContext, dynamicPartValues);
- localJobInfo= HCatBaseOutputFormat.getJobInfo(currTaskContext);
+ localJobInfo = HCatBaseOutputFormat.getJobInfo(currTaskContext);
//setup serDe
SerDe currSerDe = ReflectionUtils.newInstance(storageHandler.getSerDeClass(), currTaskContext.getJobConf());
try {
InternalUtil.initializeOutputSerDe(currSerDe, currTaskContext.getConfiguration(), localJobInfo);
} catch (SerDeException e) {
- throw new IOException("Failed to initialize SerDe",e);
+ throw new IOException("Failed to initialize SerDe", e);
}
//create base OutputFormat
org.apache.hadoop.mapred.OutputFormat baseOF =
- ReflectionUtils.newInstance(storageHandler.getOutputFormatClass(), currTaskContext.getJobConf());
- //check outputSpecs
- baseOF.checkOutputSpecs(null,currTaskContext.getJobConf());
+ ReflectionUtils.newInstance(storageHandler.getOutputFormatClass(), currTaskContext.getJobConf());
//get Output Committer
- org.apache.hadoop.mapred.OutputCommitter baseOutputCommitter = currTaskContext.getJobConf().getOutputCommitter();
+ org.apache.hadoop.mapred.OutputCommitter baseOutputCommitter = currTaskContext.getJobConf().getOutputCommitter();
//create currJobContext the latest so it gets all the config changes
org.apache.hadoop.mapred.JobContext currJobContext = HCatMapRedUtil.createJobContext(currTaskContext);
+
+ //We are skipping calling checkOutputSpecs() for each partition
+ //As it can throw a FileAlreadyExistsException when more than one mapper is writing to a partition
+ //See HCATALOG-490, also to avoid contacting the namenode for each new FileOutputFormat instance
+ //In general this should be ok for most FileOutputFormat implementations
+ //but may become an issue for cases when the method is used to perform other setup tasks
+
//setupJob()
baseOutputCommitter.setupJob(currJobContext);
//recreate to refresh jobConf of currTask context
currTaskContext =
- HCatMapRedUtil.createTaskAttemptContext(currJobContext.getJobConf(),
- currTaskContext.getTaskAttemptID(),
- currTaskContext.getProgressible());
+ HCatMapRedUtil.createTaskAttemptContext(currJobContext.getJobConf(),
+ currTaskContext.getTaskAttemptID(),
+ currTaskContext.getProgressible());
//set temp location
currTaskContext.getConfiguration().set("mapred.work.output.dir",
- new FileOutputCommitter(new Path(localJobInfo.getLocation()),currTaskContext).getWorkPath().toString());
+ new FileOutputCommitter(new Path(localJobInfo.getLocation()), currTaskContext).getWorkPath().toString());
//setupTask()
baseOutputCommitter.setupTask(currTaskContext);
org.apache.hadoop.mapred.RecordWriter baseRecordWriter =
- baseOF.getRecordWriter(null,
- currTaskContext.getJobConf(),
- FileOutputFormat.getUniqueFile(currTaskContext, "part", ""),
- InternalUtil.createReporter(currTaskContext));
+ baseOF.getRecordWriter(null,
+ currTaskContext.getJobConf(),
+ FileOutputFormat.getUniqueFile(currTaskContext, "part", ""),
+ InternalUtil.createReporter(currTaskContext));
baseDynamicWriters.put(dynKey, baseRecordWriter);
- baseDynamicSerDe.put(dynKey,currSerDe);
- baseDynamicCommitters.put(dynKey,baseOutputCommitter);
- dynamicContexts.put(dynKey,currTaskContext);
- dynamicObjectInspectors.put(dynKey,InternalUtil.createStructObjectInspector(jobInfo.getOutputSchema()));
+ baseDynamicSerDe.put(dynKey, currSerDe);
+ baseDynamicCommitters.put(dynKey, baseOutputCommitter);
+ dynamicContexts.put(dynKey, currTaskContext);
+ dynamicObjectInspectors.put(dynKey, InternalUtil.createStructObjectInspector(jobInfo.getOutputSchema()));
dynamicOutputJobInfo.put(dynKey, HCatOutputFormat.getJobInfo(dynamicContexts.get(dynKey)));
}
-
+
localJobInfo = dynamicOutputJobInfo.get(dynKey);
localWriter = baseDynamicWriters.get(dynKey);
localSerDe = baseDynamicSerDe.get(dynKey);
localObjectInspector = dynamicObjectInspectors.get(dynKey);
- }
- else{
+ } else {
localJobInfo = jobInfo;
localWriter = getBaseRecordWriter();
localSerDe = serDe;
localObjectInspector = objectInspector;
}
- for(Integer colToDel : partColsToDel){
+ for (Integer colToDel : partColsToDel) {
value.remove(colToDel);
}
@@ -251,7 +254,7 @@ class FileRecordWriterContainer extends
try {
localWriter.write(NullWritable.get(), localSerDe.serialize(value.getAll(), localObjectInspector));
} catch (SerDeException e) {
- throw new IOException("Failed to serialize object",e);
+ throw new IOException("Failed to serialize object", e);
}
}
Modified: incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java?rev=1388607&r1=1388606&r2=1388607&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java (original)
+++ incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java Fri Sep 21 18:37:05 2012
@@ -25,328 +25,342 @@ import java.util.List;
import java.util.Map;
import junit.framework.Assert;
-import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.cli.CliSessionState;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
-import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hcatalog.HcatTestUtils;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
import org.apache.hcatalog.data.DefaultHCatRecord;
import org.apache.hcatalog.data.HCatRecord;
import org.apache.hcatalog.data.schema.HCatFieldSchema;
import org.apache.hcatalog.data.schema.HCatSchema;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.junit.Assert.assertTrue;
+
/**
* Test for HCatOutputFormat. Writes a partition using HCatOutputFormat and reads
* it back using HCatInputFormat, checks the column values and counts.
*/
-public abstract class HCatMapReduceTest extends TestCase {
-
- private static final Logger LOG = LoggerFactory.getLogger(HCatMapReduceTest.class);
- protected String dbName = "default";
- protected String tableName = "testHCatMapReduceTable";
-
- protected String inputFormat = RCFileInputFormat.class.getName();
- protected String outputFormat = RCFileOutputFormat.class.getName();
- protected String serdeClass = ColumnarSerDe.class.getName();
-
- private static List<HCatRecord> writeRecords = new ArrayList<HCatRecord>();
- private static List<HCatRecord> readRecords = new ArrayList<HCatRecord>();
-
- protected abstract void initialize() throws Exception;
-
- protected abstract List<FieldSchema> getPartitionKeys();
-
- protected abstract List<FieldSchema> getTableColumns();
+public abstract class HCatMapReduceTest extends HCatBaseTest {
- private HiveMetaStoreClient client;
- protected HiveConf hiveConf;
+ private static final Logger LOG = LoggerFactory.getLogger(HCatMapReduceTest.class);
+ protected static String dbName = MetaStoreUtils.DEFAULT_DATABASE_NAME;
+ protected static String tableName = "testHCatMapReduceTable";
- private FileSystem fs;
- private String thriftUri = null;
+ protected String inputFormat = RCFileInputFormat.class.getName();
+ protected String outputFormat = RCFileOutputFormat.class.getName();
+ protected String serdeClass = ColumnarSerDe.class.getName();
- protected Driver driver;
+ private static List<HCatRecord> writeRecords = new ArrayList<HCatRecord>();
+ private static List<HCatRecord> readRecords = new ArrayList<HCatRecord>();
- @Override
- protected void setUp() throws Exception {
- hiveConf = new HiveConf(this.getClass());
+ protected abstract List<FieldSchema> getPartitionKeys();
- //The default org.apache.hadoop.hive.ql.hooks.PreExecutePrinter hook
- //is present only in the ql/test directory
- hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
- hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
- hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
- driver = new Driver(hiveConf);
- SessionState.start(new CliSessionState(hiveConf));
+ protected abstract List<FieldSchema> getTableColumns();
- thriftUri = System.getenv("HCAT_METASTORE_URI");
+ private static FileSystem fs;
- if( thriftUri != null ) {
- LOG.info("Using URI {}", thriftUri);
-
- hiveConf.set("hive.metastore.local", "false");
- hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, thriftUri);
+ @BeforeClass
+ public static void setUpOneTime() throws Exception {
+ fs = new LocalFileSystem();
+ fs.initialize(fs.getWorkingDirectory().toUri(), new Configuration());
+ MapCreate.writeCount = 0;
+ MapRead.readCount = 0;
}
- fs = new LocalFileSystem();
- fs.initialize(fs.getWorkingDirectory().toUri(), new Configuration());
-
- initialize();
-
- client = new HiveMetaStoreClient(hiveConf, null);
- initTable();
- }
-
- @Override
- protected void tearDown() throws Exception {
- try {
- String databaseName = (dbName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME : dbName;
+ @After
+ public void deleteTable() throws Exception {
+ try {
+ String databaseName = (dbName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME : dbName;
- client.dropTable(databaseName, tableName);
- } catch(Exception e) {
- e.printStackTrace();
- throw e;
+ client.dropTable(databaseName, tableName);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw e;
+ }
}
- client.close();
- }
-
-
-
- private void initTable() throws Exception {
-
- String databaseName = (dbName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME : dbName;
-
- try {
- client.dropTable(databaseName, tableName);
- } catch(Exception e) {
- } //can fail with NoSuchObjectException
-
-
- Table tbl = new Table();
- tbl.setDbName(databaseName);
- tbl.setTableName(tableName);
- tbl.setTableType("MANAGED_TABLE");
- StorageDescriptor sd = new StorageDescriptor();
-
- sd.setCols(getTableColumns());
- tbl.setPartitionKeys(getPartitionKeys());
-
- tbl.setSd(sd);
-
- sd.setBucketCols(new ArrayList<String>(2));
- sd.setSerdeInfo(new SerDeInfo());
- sd.getSerdeInfo().setName(tbl.getTableName());
- sd.getSerdeInfo().setParameters(new HashMap<String, String>());
- sd.getSerdeInfo().getParameters().put(
- org.apache.hadoop.hive.serde.Constants.SERIALIZATION_FORMAT, "1");
- sd.getSerdeInfo().setSerializationLib(serdeClass);
- sd.setInputFormat(inputFormat);
- sd.setOutputFormat(outputFormat);
+ @Before
+ public void disableHiveClientCache() throws IOException, MetaException {
+ HiveConf hiveConf = new HiveConf();
+ hiveConf.setInt(HCatConstants.HCAT_HIVE_CLIENT_EXPIRY_TIME, 0);
+ // Hack to initialize cache with 0 expiry time causing it to return a new hive client every time
+ // Otherwise the cache doesn't play well with the second test method with the client gets closed() in the
+ // tearDown() of the previous test
+ HCatUtil.getHiveClient(hiveConf);
+ }
- Map<String, String> tableParams = new HashMap<String, String>();
- tbl.setParameters(tableParams);
+ @Before
+ public void createTable() throws Exception {
+ String databaseName = (dbName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME : dbName;
- client.createTable(tbl);
- }
+ try {
+ client.dropTable(databaseName, tableName);
+ } catch (Exception e) {
+ } //can fail with NoSuchObjectException
+
+
+ Table tbl = new Table();
+ tbl.setDbName(databaseName);
+ tbl.setTableName(tableName);
+ tbl.setTableType("MANAGED_TABLE");
+ StorageDescriptor sd = new StorageDescriptor();
+
+ sd.setCols(getTableColumns());
+ tbl.setPartitionKeys(getPartitionKeys());
+
+ tbl.setSd(sd);
+
+ sd.setBucketCols(new ArrayList<String>(2));
+ sd.setSerdeInfo(new SerDeInfo());
+ sd.getSerdeInfo().setName(tbl.getTableName());
+ sd.getSerdeInfo().setParameters(new HashMap<String, String>());
+ sd.getSerdeInfo().getParameters().put(
+ org.apache.hadoop.hive.serde.Constants.SERIALIZATION_FORMAT, "1");
+ sd.getSerdeInfo().setSerializationLib(serdeClass);
+ sd.setInputFormat(inputFormat);
+ sd.setOutputFormat(outputFormat);
- //Create test input file with specified number of rows
- private void createInputFile(Path path, int rowCount) throws IOException {
+ Map<String, String> tableParams = new HashMap<String, String>();
+ tbl.setParameters(tableParams);
- if( fs.exists(path) ) {
- fs.delete(path, true);
+ client.createTable(tbl);
}
- FSDataOutputStream os = fs.create(path);
+ //Create test input file with specified number of rows
+ private void createInputFile(Path path, int rowCount) throws IOException {
- for(int i = 0;i < rowCount;i++) {
- os.writeChars(i + "\n");
- }
+ if (fs.exists(path)) {
+ fs.delete(path, true);
+ }
- os.close();
- }
+ FSDataOutputStream os = fs.create(path);
- public static class MapCreate extends
- Mapper<LongWritable, Text, BytesWritable, HCatRecord> {
+ for (int i = 0; i < rowCount; i++) {
+ os.writeChars(i + "\n");
+ }
- static int writeCount = 0; //test will be in local mode
+ os.close();
+ }
- @Override
- public void map(LongWritable key, Text value, Context context
- ) throws IOException, InterruptedException {
- {
- try {
- HCatRecord rec = writeRecords.get(writeCount);
- context.write(null, rec);
- writeCount++;
+ public static class MapCreate extends
+ Mapper<LongWritable, Text, BytesWritable, HCatRecord> {
- }catch(Exception e) {
+ static int writeCount = 0; //test will be in local mode
- e.printStackTrace(System.err); //print since otherwise exception is lost
- throw new IOException(e);
+ @Override
+ public void map(LongWritable key, Text value, Context context
+ ) throws IOException, InterruptedException {
+ {
+ try {
+ HCatRecord rec = writeRecords.get(writeCount);
+ context.write(null, rec);
+ writeCount++;
+
+ } catch (Exception e) {
+
+ e.printStackTrace(System.err); //print since otherwise exception is lost
+ throw new IOException(e);
+ }
+ }
}
- }
}
- }
- public static class MapRead extends
- Mapper<WritableComparable, HCatRecord, BytesWritable, Text> {
+ public static class MapRead extends
+ Mapper<WritableComparable, HCatRecord, BytesWritable, Text> {
- static int readCount = 0; //test will be in local mode
+ static int readCount = 0; //test will be in local mode
- @Override
- public void map(WritableComparable key, HCatRecord value, Context context
- ) throws IOException, InterruptedException {
- {
- try {
- readRecords.add(value);
- readCount++;
- } catch(Exception e) {
- e.printStackTrace(); //print since otherwise exception is lost
- throw new IOException(e);
+ @Override
+ public void map(WritableComparable key, HCatRecord value, Context context
+ ) throws IOException, InterruptedException {
+ {
+ try {
+ readRecords.add(value);
+ readCount++;
+ } catch (Exception e) {
+ e.printStackTrace(); //print since otherwise exception is lost
+ throw new IOException(e);
+ }
+ }
}
- }
}
- }
- Job runMRCreate(Map<String, String> partitionValues,
- List<HCatFieldSchema> partitionColumns, List<HCatRecord> records,
- int writeCount, boolean assertWrite) throws Exception {
+ Job runMRCreate(Map<String, String> partitionValues,
+ List<HCatFieldSchema> partitionColumns, List<HCatRecord> records,
+ int writeCount, boolean assertWrite) throws Exception {
+ return runMRCreate(partitionValues, partitionColumns, records, writeCount, assertWrite, true);
+ }
- writeRecords = records;
- MapCreate.writeCount = 0;
+ /**
+ * Run a local map reduce job to load data from in memory records to an HCatalog Table
+ * @param partitionValues
+ * @param partitionColumns
+ * @param records data to be written to HCatalog table
+ * @param writeCount
+ * @param assertWrite
+ * @param asSingleMapTask
+ * @return
+ * @throws Exception
+ */
+ Job runMRCreate(Map<String, String> partitionValues,
+ List<HCatFieldSchema> partitionColumns, List<HCatRecord> records,
+ int writeCount, boolean assertWrite, boolean asSingleMapTask) throws Exception {
+
+ writeRecords = records;
+ MapCreate.writeCount = 0;
+
+ Configuration conf = new Configuration();
+ Job job = new Job(conf, "hcat mapreduce write test");
+ job.setJarByClass(this.getClass());
+ job.setMapperClass(HCatMapReduceTest.MapCreate.class);
+
+ // input/output settings
+ job.setInputFormatClass(TextInputFormat.class);
+
+ if (asSingleMapTask) {
+ // One input path would mean only one map task
+ Path path = new Path(fs.getWorkingDirectory(), "mapred/testHCatMapReduceInput");
+ createInputFile(path, writeCount);
+ TextInputFormat.setInputPaths(job, path);
+ } else {
+ // Create two input paths so that two map tasks get triggered. There could be other ways
+ // to trigger two map tasks.
+ Path path = new Path(fs.getWorkingDirectory(), "mapred/testHCatMapReduceInput");
+ createInputFile(path, writeCount / 2);
- Configuration conf = new Configuration();
- Job job = new Job(conf, "hcat mapreduce write test");
- job.setJarByClass(this.getClass());
- job.setMapperClass(HCatMapReduceTest.MapCreate.class);
+ Path path2 = new Path(fs.getWorkingDirectory(), "mapred/testHCatMapReduceInput2");
+ createInputFile(path2, (writeCount - writeCount / 2));
- // input/output settings
- job.setInputFormatClass(TextInputFormat.class);
+ TextInputFormat.setInputPaths(job, path, path2);
+ }
- Path path = new Path(fs.getWorkingDirectory(), "mapred/testHCatMapReduceInput");
- createInputFile(path, writeCount);
+ job.setOutputFormatClass(HCatOutputFormat.class);
- TextInputFormat.setInputPaths(job, path);
+ OutputJobInfo outputJobInfo = OutputJobInfo.create(dbName, tableName, partitionValues);
+ HCatOutputFormat.setOutput(job, outputJobInfo);
- job.setOutputFormatClass(HCatOutputFormat.class);
+ job.setMapOutputKeyClass(BytesWritable.class);
+ job.setMapOutputValueClass(DefaultHCatRecord.class);
- OutputJobInfo outputJobInfo = OutputJobInfo.create(dbName, tableName, partitionValues);
- HCatOutputFormat.setOutput(job, outputJobInfo);
+ job.setNumReduceTasks(0);
- job.setMapOutputKeyClass(BytesWritable.class);
- job.setMapOutputValueClass(DefaultHCatRecord.class);
+ HCatOutputFormat.setSchema(job, new HCatSchema(partitionColumns));
- job.setNumReduceTasks(0);
+ boolean success = job.waitForCompletion(true);
- HCatOutputFormat.setSchema(job, new HCatSchema(partitionColumns));
+ // Ensure counters are set when data has actually been read.
+ if (partitionValues != null) {
+ assertTrue(job.getCounters().getGroup("FileSystemCounters")
+ .findCounter("FILE_BYTES_READ").getValue() > 0);
+ }
- boolean success = job.waitForCompletion(true);
+ if (!HcatTestUtils.isHadoop23()) {
+ // Local mode outputcommitter hook is not invoked in Hadoop 1.x
+ if (success) {
+ new FileOutputCommitterContainer(job, null).commitJob(job);
+ } else {
+ new FileOutputCommitterContainer(job, null).abortJob(job, JobStatus.State.FAILED);
+ }
+ }
+ if (assertWrite) {
+ // we assert only if we expected to assert with this call.
+ Assert.assertEquals(writeCount, MapCreate.writeCount);
+ }
- // Ensure counters are set when data has actually been read.
- if (partitionValues != null) {
- assertTrue(job.getCounters().getGroup("FileSystemCounters")
- .findCounter("FILE_BYTES_READ").getValue() > 0);
+ return job;
}
- if (!HcatTestUtils.isHadoop23()) {
- // Local mode outputcommitter hook is not invoked in Hadoop 1.x
- if (success) {
- new FileOutputCommitterContainer(job,null).commitJob(job);
- } else {
- new FileOutputCommitterContainer(job,null).abortJob(job, JobStatus.State.FAILED);
- }
+ List<HCatRecord> runMRRead(int readCount) throws Exception {
+ return runMRRead(readCount, null);
}
- if (assertWrite){
- // we assert only if we expected to assert with this call.
- Assert.assertEquals(writeCount, MapCreate.writeCount);
- }
-
- return job;
- }
-
- List<HCatRecord> runMRRead(int readCount) throws Exception {
- return runMRRead(readCount, null);
- }
-
- List<HCatRecord> runMRRead(int readCount, String filter) throws Exception {
-
- MapRead.readCount = 0;
- readRecords.clear();
-
- Configuration conf = new Configuration();
- Job job = new Job(conf, "hcat mapreduce read test");
- job.setJarByClass(this.getClass());
- job.setMapperClass(HCatMapReduceTest.MapRead.class);
- // input/output settings
- job.setInputFormatClass(HCatInputFormat.class);
- job.setOutputFormatClass(TextOutputFormat.class);
-
- InputJobInfo inputJobInfo = InputJobInfo.create(dbName,tableName,filter);
- HCatInputFormat.setInput(job, inputJobInfo);
+ /**
+ * Run a local map reduce job to read records from HCatalog table and verify if the count is as expected
+ * @param readCount
+ * @param filter
+ * @return
+ * @throws Exception
+ */
+ List<HCatRecord> runMRRead(int readCount, String filter) throws Exception {
+
+ MapRead.readCount = 0;
+ readRecords.clear();
+
+ Configuration conf = new Configuration();
+ Job job = new Job(conf, "hcat mapreduce read test");
+ job.setJarByClass(this.getClass());
+ job.setMapperClass(HCatMapReduceTest.MapRead.class);
+
+ // input/output settings
+ job.setInputFormatClass(HCatInputFormat.class);
+ job.setOutputFormatClass(TextOutputFormat.class);
+
+ InputJobInfo inputJobInfo = InputJobInfo.create(dbName, tableName, filter);
+ HCatInputFormat.setInput(job, inputJobInfo);
+
+ job.setMapOutputKeyClass(BytesWritable.class);
+ job.setMapOutputValueClass(Text.class);
+
+ job.setNumReduceTasks(0);
+
+ Path path = new Path(fs.getWorkingDirectory(), "mapred/testHCatMapReduceOutput");
+ if (fs.exists(path)) {
+ fs.delete(path, true);
+ }
- job.setMapOutputKeyClass(BytesWritable.class);
- job.setMapOutputValueClass(Text.class);
+ TextOutputFormat.setOutputPath(job, path);
- job.setNumReduceTasks(0);
+ job.waitForCompletion(true);
+ Assert.assertEquals(readCount, MapRead.readCount);
- Path path = new Path(fs.getWorkingDirectory(), "mapred/testHCatMapReduceOutput");
- if( fs.exists(path) ) {
- fs.delete(path, true);
+ return readRecords;
}
- TextOutputFormat.setOutputPath(job, path);
-
- job.waitForCompletion(true);
- Assert.assertEquals(readCount, MapRead.readCount);
- return readRecords;
- }
+ protected HCatSchema getTableSchema() throws Exception {
+ Configuration conf = new Configuration();
+ Job job = new Job(conf, "hcat mapreduce read schema test");
+ job.setJarByClass(this.getClass());
- protected HCatSchema getTableSchema() throws Exception {
+ // input/output settings
+ job.setInputFormatClass(HCatInputFormat.class);
+ job.setOutputFormatClass(TextOutputFormat.class);
- Configuration conf = new Configuration();
- Job job = new Job(conf, "hcat mapreduce read schema test");
- job.setJarByClass(this.getClass());
+ InputJobInfo inputJobInfo = InputJobInfo.create(dbName, tableName, null);
+ HCatInputFormat.setInput(job, inputJobInfo);
- // input/output settings
- job.setInputFormatClass(HCatInputFormat.class);
- job.setOutputFormatClass(TextOutputFormat.class);
-
- InputJobInfo inputJobInfo = InputJobInfo.create(dbName,tableName,null);
- HCatInputFormat.setInput(job, inputJobInfo);
-
- return HCatInputFormat.getTableSchema(job);
- }
+ return HCatInputFormat.getTableSchema(job);
+ }
}
Modified: incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java?rev=1388607&r1=1388606&r2=1388607&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java (original)
+++ incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java Fri Sep 21 18:37:05 2012
@@ -34,129 +34,151 @@ import org.apache.hcatalog.data.DefaultH
import org.apache.hcatalog.data.HCatRecord;
import org.apache.hcatalog.data.schema.HCatFieldSchema;
import org.apache.hcatalog.data.schema.HCatSchemaUtils;
+import org.junit.BeforeClass;
+import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertTrue;
+
public class TestHCatDynamicPartitioned extends HCatMapReduceTest {
- private List<HCatRecord> writeRecords;
- private List<HCatFieldSchema> dataColumns;
- private static final Logger LOG = LoggerFactory.getLogger(TestHCatDynamicPartitioned.class);
-
- @Override
- protected void initialize() throws Exception {
-
- tableName = "testHCatDynamicPartitionedTable";
- generateWriteRecords(20,5,0);
- generateDataColumns();
- }
-
- private void generateDataColumns() throws HCatException {
- dataColumns = new ArrayList<HCatFieldSchema>();
- dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", Constants.INT_TYPE_NAME, "")));
- dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", Constants.STRING_TYPE_NAME, "")));
- dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("p1", Constants.STRING_TYPE_NAME, "")));
- }
-
- private void generateWriteRecords(int max, int mod,int offset) {
- writeRecords = new ArrayList<HCatRecord>();
-
- for(int i = 0;i < max;i++) {
- List<Object> objList = new ArrayList<Object>();
-
- objList.add(i);
- objList.add("strvalue" + i);
- objList.add(String.valueOf((i % mod)+offset));
- writeRecords.add(new DefaultHCatRecord(objList));
- }
- }
-
- @Override
- protected List<FieldSchema> getPartitionKeys() {
- List<FieldSchema> fields = new ArrayList<FieldSchema>();
- fields.add(new FieldSchema("p1", Constants.STRING_TYPE_NAME, ""));
- return fields;
- }
-
- @Override
- protected List<FieldSchema> getTableColumns() {
- List<FieldSchema> fields = new ArrayList<FieldSchema>();
- fields.add(new FieldSchema("c1", Constants.INT_TYPE_NAME, ""));
- fields.add(new FieldSchema("c2", Constants.STRING_TYPE_NAME, ""));
- return fields;
- }
-
-
- public void testHCatDynamicPartitionedTable() throws Exception {
-
- generateWriteRecords(20,5,0);
- runMRCreate(null, dataColumns, writeRecords, 20,true);
-
- runMRRead(20);
-
- //Read with partition filter
- runMRRead(4, "p1 = \"0\"");
- runMRRead(8, "p1 = \"1\" or p1 = \"3\"");
- runMRRead(4, "p1 = \"4\"");
-
- // read from hive to test
-
- String query = "select * from " + tableName;
- int retCode = driver.run(query).getResponseCode();
-
- if( retCode != 0 ) {
- throw new Exception("Error " + retCode + " running query " + query);
- }
-
- ArrayList<String> res = new ArrayList<String>();
- driver.getResults(res);
- assertEquals(20, res.size());
-
-
- //Test for duplicate publish
- IOException exc = null;
- try {
- generateWriteRecords(20,5,0);
- Job job = runMRCreate(null, dataColumns, writeRecords, 20,false);
- if (HcatTestUtils.isHadoop23()) {
- new FileOutputCommitterContainer(job,null).cleanupJob(job);
- }
- } catch(IOException e) {
- exc = e;
- }
-
- assertTrue(exc != null);
- assertTrue(exc instanceof HCatException);
- assertTrue( "Got exception of type ["+((HCatException) exc).getErrorType().toString()
- + "] Expected ERROR_PUBLISHING_PARTITION or ERROR_MOVE_FAILED",
- (ErrorType.ERROR_PUBLISHING_PARTITION == ((HCatException) exc).getErrorType())
- || (ErrorType.ERROR_MOVE_FAILED == ((HCatException) exc).getErrorType())
+ private static List<HCatRecord> writeRecords;
+ private static List<HCatFieldSchema> dataColumns;
+ private static final Logger LOG = LoggerFactory.getLogger(TestHCatDynamicPartitioned.class);
+ private static final int NUM_RECORDS = 20;
+ private static final int NUM_PARTITIONS = 5;
+
+ @BeforeClass
+ public static void generateInputData() throws Exception {
+ tableName = "testHCatDynamicPartitionedTable";
+ generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0);
+ generateDataColumns();
+ }
+
+ private static void generateDataColumns() throws HCatException {
+ dataColumns = new ArrayList<HCatFieldSchema>();
+ dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", Constants.INT_TYPE_NAME, "")));
+ dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", Constants.STRING_TYPE_NAME, "")));
+ dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("p1", Constants.STRING_TYPE_NAME, "")));
+ }
+
+ private static void generateWriteRecords(int max, int mod, int offset) {
+ writeRecords = new ArrayList<HCatRecord>();
+
+ for (int i = 0; i < max; i++) {
+ List<Object> objList = new ArrayList<Object>();
+
+ objList.add(i);
+ objList.add("strvalue" + i);
+ objList.add(String.valueOf((i % mod) + offset));
+ writeRecords.add(new DefaultHCatRecord(objList));
+ }
+ }
+
+ @Override
+ protected List<FieldSchema> getPartitionKeys() {
+ List<FieldSchema> fields = new ArrayList<FieldSchema>();
+ fields.add(new FieldSchema("p1", Constants.STRING_TYPE_NAME, ""));
+ return fields;
+ }
+
+ @Override
+ protected List<FieldSchema> getTableColumns() {
+ List<FieldSchema> fields = new ArrayList<FieldSchema>();
+ fields.add(new FieldSchema("c1", Constants.INT_TYPE_NAME, ""));
+ fields.add(new FieldSchema("c2", Constants.STRING_TYPE_NAME, ""));
+ return fields;
+ }
+
+ /**
+ * Run the dynamic partitioning test but with single map task
+ * @throws Exception
+ */
+ @Test
+ public void testHCatDynamicPartitionedTable() throws Exception {
+ runHCatDynamicPartitionedTable(true);
+ }
+
+ /**
+ * Run the dynamic partitioning test but with multiple map task. See HCATALOG-490
+ * @throws Exception
+ */
+ @Test
+ public void testHCatDynamicPartitionedTableMultipleTask() throws Exception {
+ runHCatDynamicPartitionedTable(false);
+ }
+
+ protected void runHCatDynamicPartitionedTable(boolean asSingleMapTask) throws Exception {
+ generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0);
+ runMRCreate(null, dataColumns, writeRecords, NUM_RECORDS, true, asSingleMapTask);
+
+ runMRRead(NUM_RECORDS);
+
+ //Read with partition filter
+ runMRRead(4, "p1 = \"0\"");
+ runMRRead(8, "p1 = \"1\" or p1 = \"3\"");
+ runMRRead(4, "p1 = \"4\"");
+
+ // read from hive to test
+
+ String query = "select * from " + tableName;
+ int retCode = driver.run(query).getResponseCode();
+
+ if (retCode != 0) {
+ throw new Exception("Error " + retCode + " running query " + query);
+ }
+
+ ArrayList<String> res = new ArrayList<String>();
+ driver.getResults(res);
+ assertEquals(NUM_RECORDS, res.size());
+
+
+ //Test for duplicate publish
+ IOException exc = null;
+ try {
+ generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0);
+ Job job = runMRCreate(null, dataColumns, writeRecords, NUM_RECORDS, false);
+ if (HcatTestUtils.isHadoop23()) {
+ new FileOutputCommitterContainer(job, null).cleanupJob(job);
+ }
+ } catch (IOException e) {
+ exc = e;
+ }
+
+ assertTrue(exc != null);
+ assertTrue(exc instanceof HCatException);
+ assertTrue("Got exception of type [" + ((HCatException) exc).getErrorType().toString()
+ + "] Expected ERROR_PUBLISHING_PARTITION or ERROR_MOVE_FAILED",
+ (ErrorType.ERROR_PUBLISHING_PARTITION == ((HCatException) exc).getErrorType())
+ || (ErrorType.ERROR_MOVE_FAILED == ((HCatException) exc).getErrorType())
);
- }
+ }
-//TODO 1.0 miniCluster is slow this test times out, make it work
+ //TODO 1.0 miniCluster is slow this test times out, make it work
// renaming test to make test framework skip it
- public void _testHCatDynamicPartitionMaxPartitions() throws Exception {
- HiveConf hc = new HiveConf(this.getClass());
+ public void _testHCatDynamicPartitionMaxPartitions() throws Exception {
+ HiveConf hc = new HiveConf(this.getClass());
- int maxParts = hiveConf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS);
- LOG.info("Max partitions allowed = {}", maxParts);
+ int maxParts = hiveConf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS);
+ LOG.info("Max partitions allowed = {}", maxParts);
- IOException exc = null;
- try {
- generateWriteRecords(maxParts+5,maxParts+2,10);
- runMRCreate(null,dataColumns,writeRecords,maxParts+5,false);
- } catch(IOException e) {
- exc = e;
- }
-
- if (HCatConstants.HCAT_IS_DYNAMIC_MAX_PTN_CHECK_ENABLED){
- assertTrue(exc != null);
- assertTrue(exc instanceof HCatException);
- assertEquals(ErrorType.ERROR_TOO_MANY_DYNAMIC_PTNS, ((HCatException) exc).getErrorType());
- }else{
- assertTrue(exc == null);
- runMRRead(maxParts+5);
+ IOException exc = null;
+ try {
+ generateWriteRecords(maxParts + 5, maxParts + 2, 10);
+ runMRCreate(null, dataColumns, writeRecords, maxParts + 5, false);
+ } catch (IOException e) {
+ exc = e;
+ }
+
+ if (HCatConstants.HCAT_IS_DYNAMIC_MAX_PTN_CHECK_ENABLED) {
+ assertTrue(exc != null);
+ assertTrue(exc instanceof HCatException);
+ assertEquals(ErrorType.ERROR_TOO_MANY_DYNAMIC_PTNS, ((HCatException) exc).getErrorType());
+ } else {
+ assertTrue(exc == null);
+ runMRRead(maxParts + 5);
+ }
}
- }
}
Modified: incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java?rev=1388607&r1=1388606&r2=1388607&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java (original)
+++ incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java Fri Sep 21 18:37:05 2012
@@ -32,99 +32,105 @@ import org.apache.hcatalog.data.DefaultH
import org.apache.hcatalog.data.HCatRecord;
import org.apache.hcatalog.data.schema.HCatFieldSchema;
import org.apache.hcatalog.data.schema.HCatSchemaUtils;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static junit.framework.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
public class TestHCatNonPartitioned extends HCatMapReduceTest {
- private List<HCatRecord> writeRecords;
- List<HCatFieldSchema> partitionColumns;
+ private static List<HCatRecord> writeRecords;
+ static List<HCatFieldSchema> partitionColumns;
+
+ @BeforeClass
+ public static void oneTimeSetUp() throws Exception {
- @Override
- protected void initialize() throws HCatException {
+ dbName = null; //test if null dbName works ("default" is used)
+ tableName = "testHCatNonPartitionedTable";
- dbName = null; //test if null dbName works ("default" is used)
- tableName = "testHCatNonPartitionedTable";
+ writeRecords = new ArrayList<HCatRecord>();
- writeRecords = new ArrayList<HCatRecord>();
+ for (int i = 0; i < 20; i++) {
+ List<Object> objList = new ArrayList<Object>();
- for(int i = 0;i < 20;i++) {
- List<Object> objList = new ArrayList<Object>();
+ objList.add(i);
+ objList.add("strvalue" + i);
+ writeRecords.add(new DefaultHCatRecord(objList));
+ }
- objList.add(i);
- objList.add("strvalue" + i);
- writeRecords.add(new DefaultHCatRecord(objList));
+ partitionColumns = new ArrayList<HCatFieldSchema>();
+ partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", Constants.INT_TYPE_NAME, "")));
+ partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", Constants.STRING_TYPE_NAME, "")));
}
- partitionColumns = new ArrayList<HCatFieldSchema>();
- partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", Constants.INT_TYPE_NAME, "")));
- partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", Constants.STRING_TYPE_NAME, "")));
- }
-
- @Override
- protected List<FieldSchema> getPartitionKeys() {
- List<FieldSchema> fields = new ArrayList<FieldSchema>();
- //empty list, non partitioned
- return fields;
- }
-
- @Override
- protected List<FieldSchema> getTableColumns() {
- List<FieldSchema> fields = new ArrayList<FieldSchema>();
- fields.add(new FieldSchema("c1", Constants.INT_TYPE_NAME, ""));
- fields.add(new FieldSchema("c2", Constants.STRING_TYPE_NAME, ""));
- return fields;
- }
-
-
- public void testHCatNonPartitionedTable() throws Exception {
-
- Map<String, String> partitionMap = new HashMap<String, String>();
- runMRCreate(null, partitionColumns, writeRecords, 10,true);
-
- //Test for duplicate publish
- IOException exc = null;
- try {
- runMRCreate(null, partitionColumns, writeRecords, 20,true);
- } catch(IOException e) {
- exc = e;
+ @Override
+ protected List<FieldSchema> getPartitionKeys() {
+ List<FieldSchema> fields = new ArrayList<FieldSchema>();
+ //empty list, non partitioned
+ return fields;
}
- assertTrue(exc != null);
- assertTrue(exc instanceof HCatException);
- assertEquals(ErrorType.ERROR_NON_EMPTY_TABLE, ((HCatException) exc).getErrorType());
-
- //Test for publish with invalid partition key name
- exc = null;
- partitionMap.clear();
- partitionMap.put("px", "p1value2");
-
- try {
- runMRCreate(partitionMap, partitionColumns, writeRecords, 20,true);
- } catch(IOException e) {
- exc = e;
+ @Override
+ protected List<FieldSchema> getTableColumns() {
+ List<FieldSchema> fields = new ArrayList<FieldSchema>();
+ fields.add(new FieldSchema("c1", Constants.INT_TYPE_NAME, ""));
+ fields.add(new FieldSchema("c2", Constants.STRING_TYPE_NAME, ""));
+ return fields;
}
- assertTrue(exc != null);
- assertTrue(exc instanceof HCatException);
- assertEquals(ErrorType.ERROR_INVALID_PARTITION_VALUES, ((HCatException) exc).getErrorType());
- //Read should get 10 rows
- runMRRead(10);
+ @Test
+ public void testHCatNonPartitionedTable() throws Exception {
- hiveReadTest();
- }
+ Map<String, String> partitionMap = new HashMap<String, String>();
+ runMRCreate(null, partitionColumns, writeRecords, 10, true);
- //Test that data inserted through hcatoutputformat is readable from hive
- private void hiveReadTest() throws Exception {
+ //Test for duplicate publish
+ IOException exc = null;
+ try {
+ runMRCreate(null, partitionColumns, writeRecords, 20, true);
+ } catch (IOException e) {
+ exc = e;
+ }
+
+ assertTrue(exc != null);
+ assertTrue(exc instanceof HCatException);
+ assertEquals(ErrorType.ERROR_NON_EMPTY_TABLE, ((HCatException) exc).getErrorType());
+
+ //Test for publish with invalid partition key name
+ exc = null;
+ partitionMap.clear();
+ partitionMap.put("px", "p1value2");
+
+ try {
+ runMRCreate(partitionMap, partitionColumns, writeRecords, 20, true);
+ } catch (IOException e) {
+ exc = e;
+ }
+
+ assertTrue(exc != null);
+ assertTrue(exc instanceof HCatException);
+ assertEquals(ErrorType.ERROR_INVALID_PARTITION_VALUES, ((HCatException) exc).getErrorType());
- String query = "select * from " + tableName;
- int retCode = driver.run(query).getResponseCode();
+ //Read should get 10 rows
+ runMRRead(10);
- if( retCode != 0 ) {
- throw new Exception("Error " + retCode + " running query " + query);
+ hiveReadTest();
}
- ArrayList<String> res = new ArrayList<String>();
- driver.getResults(res);
- assertEquals(10, res.size());
- }
+ //Test that data inserted through hcatoutputformat is readable from hive
+ private void hiveReadTest() throws Exception {
+
+ String query = "select * from " + tableName;
+ int retCode = driver.run(query).getResponseCode();
+
+ if (retCode != 0) {
+ throw new Exception("Error " + retCode + " running query " + query);
+ }
+
+ ArrayList<String> res = new ArrayList<String>();
+ driver.getResults(res);
+ assertEquals(10, res.size());
+ }
}
Modified: incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java?rev=1388607&r1=1388606&r2=1388607&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java (original)
+++ incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java Fri Sep 21 18:37:05 2012
@@ -33,312 +33,318 @@ import org.apache.hcatalog.data.HCatReco
import org.apache.hcatalog.data.schema.HCatFieldSchema;
import org.apache.hcatalog.data.schema.HCatSchema;
import org.apache.hcatalog.data.schema.HCatSchemaUtils;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static junit.framework.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
public class TestHCatPartitioned extends HCatMapReduceTest {
- private List<HCatRecord> writeRecords;
- private List<HCatFieldSchema> partitionColumns;
+ private static List<HCatRecord> writeRecords;
+ private static List<HCatFieldSchema> partitionColumns;
- @Override
- protected void initialize() throws Exception {
+ @BeforeClass
+ public static void oneTimeSetUp() throws Exception {
- tableName = "testHCatPartitionedTable";
- writeRecords = new ArrayList<HCatRecord>();
+ tableName = "testHCatPartitionedTable";
+ writeRecords = new ArrayList<HCatRecord>();
- for(int i = 0;i < 20;i++) {
- List<Object> objList = new ArrayList<Object>();
+ for (int i = 0; i < 20; i++) {
+ List<Object> objList = new ArrayList<Object>();
- objList.add(i);
- objList.add("strvalue" + i);
- writeRecords.add(new DefaultHCatRecord(objList));
- }
+ objList.add(i);
+ objList.add("strvalue" + i);
+ writeRecords.add(new DefaultHCatRecord(objList));
+ }
- partitionColumns = new ArrayList<HCatFieldSchema>();
- partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", Constants.INT_TYPE_NAME, "")));
- partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", Constants.STRING_TYPE_NAME, "")));
- }
-
-
- @Override
- protected List<FieldSchema> getPartitionKeys() {
- List<FieldSchema> fields = new ArrayList<FieldSchema>();
- //Defining partition names in unsorted order
- fields.add(new FieldSchema("PaRT1", Constants.STRING_TYPE_NAME, ""));
- fields.add(new FieldSchema("part0", Constants.STRING_TYPE_NAME, ""));
- return fields;
- }
-
- @Override
- protected List<FieldSchema> getTableColumns() {
- List<FieldSchema> fields = new ArrayList<FieldSchema>();
- fields.add(new FieldSchema("c1", Constants.INT_TYPE_NAME, ""));
- fields.add(new FieldSchema("c2", Constants.STRING_TYPE_NAME, ""));
- return fields;
- }
-
-
- public void testHCatPartitionedTable() throws Exception {
-
- Map<String, String> partitionMap = new HashMap<String, String>();
- partitionMap.put("part1", "p1value1");
- partitionMap.put("part0", "p0value1");
-
- runMRCreate(partitionMap, partitionColumns, writeRecords, 10,true);
-
- partitionMap.clear();
- partitionMap.put("PART1", "p1value2");
- partitionMap.put("PART0", "p0value2");
-
- runMRCreate(partitionMap, partitionColumns, writeRecords, 20,true);
-
- //Test for duplicate publish
- IOException exc = null;
- try {
- runMRCreate(partitionMap, partitionColumns, writeRecords, 20,true);
- } catch(IOException e) {
- exc = e;
+ partitionColumns = new ArrayList<HCatFieldSchema>();
+ partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", Constants.INT_TYPE_NAME, "")));
+ partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", Constants.STRING_TYPE_NAME, "")));
}
- assertTrue(exc != null);
- assertTrue(exc instanceof HCatException);
- assertEquals(ErrorType.ERROR_DUPLICATE_PARTITION, ((HCatException) exc).getErrorType());
-
- //Test for publish with invalid partition key name
- exc = null;
- partitionMap.clear();
- partitionMap.put("px1", "p1value2");
- partitionMap.put("px0", "p0value2");
-
- try {
- runMRCreate(partitionMap, partitionColumns, writeRecords, 20,true);
- } catch(IOException e) {
- exc = e;
+
+ @Override
+ protected List<FieldSchema> getPartitionKeys() {
+ List<FieldSchema> fields = new ArrayList<FieldSchema>();
+ //Defining partition names in unsorted order
+ fields.add(new FieldSchema("PaRT1", Constants.STRING_TYPE_NAME, ""));
+ fields.add(new FieldSchema("part0", Constants.STRING_TYPE_NAME, ""));
+ return fields;
}
- assertTrue(exc != null);
- assertTrue(exc instanceof HCatException);
- assertEquals(ErrorType.ERROR_MISSING_PARTITION_KEY, ((HCatException) exc).getErrorType());
-
- //Test for publish with missing partition key values
- exc = null;
- partitionMap.clear();
- partitionMap.put("px", "p1value2");
-
- try {
- runMRCreate(partitionMap, partitionColumns, writeRecords, 20,true);
- } catch(IOException e) {
- exc = e;
+ @Override
+ protected List<FieldSchema> getTableColumns() {
+ List<FieldSchema> fields = new ArrayList<FieldSchema>();
+ fields.add(new FieldSchema("c1", Constants.INT_TYPE_NAME, ""));
+ fields.add(new FieldSchema("c2", Constants.STRING_TYPE_NAME, ""));
+ return fields;
}
- assertTrue(exc != null);
- assertTrue(exc instanceof HCatException);
- assertEquals(ErrorType.ERROR_INVALID_PARTITION_VALUES, ((HCatException) exc).getErrorType());
+ @Test
+ public void testHCatPartitionedTable() throws Exception {
- //Test for null partition value map
- exc = null;
- try {
- runMRCreate(null, partitionColumns, writeRecords, 20,false);
- } catch(IOException e) {
- exc = e;
- }
+ Map<String, String> partitionMap = new HashMap<String, String>();
+ partitionMap.put("part1", "p1value1");
+ partitionMap.put("part0", "p0value1");
+
+ runMRCreate(partitionMap, partitionColumns, writeRecords, 10, true);
+
+ partitionMap.clear();
+ partitionMap.put("PART1", "p1value2");
+ partitionMap.put("PART0", "p0value2");
+
+ runMRCreate(partitionMap, partitionColumns, writeRecords, 20, true);
+
+ //Test for duplicate publish
+ IOException exc = null;
+ try {
+ runMRCreate(partitionMap, partitionColumns, writeRecords, 20, true);
+ } catch (IOException e) {
+ exc = e;
+ }
+
+ assertTrue(exc != null);
+ assertTrue(exc instanceof HCatException);
+ assertEquals(ErrorType.ERROR_DUPLICATE_PARTITION, ((HCatException) exc).getErrorType());
- assertTrue(exc == null);
+ //Test for publish with invalid partition key name
+ exc = null;
+ partitionMap.clear();
+ partitionMap.put("px1", "p1value2");
+ partitionMap.put("px0", "p0value2");
+
+ try {
+ runMRCreate(partitionMap, partitionColumns, writeRecords, 20, true);
+ } catch (IOException e) {
+ exc = e;
+ }
+
+ assertTrue(exc != null);
+ assertTrue(exc instanceof HCatException);
+ assertEquals(ErrorType.ERROR_MISSING_PARTITION_KEY, ((HCatException) exc).getErrorType());
+
+ //Test for publish with missing partition key values
+ exc = null;
+ partitionMap.clear();
+ partitionMap.put("px", "p1value2");
+
+ try {
+ runMRCreate(partitionMap, partitionColumns, writeRecords, 20, true);
+ } catch (IOException e) {
+ exc = e;
+ }
+
+ assertTrue(exc != null);
+ assertTrue(exc instanceof HCatException);
+ assertEquals(ErrorType.ERROR_INVALID_PARTITION_VALUES, ((HCatException) exc).getErrorType());
+
+
+ //Test for null partition value map
+ exc = null;
+ try {
+ runMRCreate(null, partitionColumns, writeRecords, 20, false);
+ } catch (IOException e) {
+ exc = e;
+ }
+
+ assertTrue(exc == null);
// assertTrue(exc instanceof HCatException);
// assertEquals(ErrorType.ERROR_PUBLISHING_PARTITION, ((HCatException) exc).getErrorType());
- // With Dynamic partitioning, this isn't an error that the keyValues specified didn't values
+ // With Dynamic partitioning, this isn't an error that the keyValues specified didn't values
- //Read should get 10 + 20 rows
- runMRRead(30);
+ //Read should get 10 + 20 rows
+ runMRRead(30);
- //Read with partition filter
- runMRRead(10, "part1 = \"p1value1\"");
- runMRRead(20, "part1 = \"p1value2\"");
- runMRRead(30, "part1 = \"p1value1\" or part1 = \"p1value2\"");
- runMRRead(10, "part0 = \"p0value1\"");
- runMRRead(20, "part0 = \"p0value2\"");
- runMRRead(30, "part0 = \"p0value1\" or part0 = \"p0value2\"");
+ //Read with partition filter
+ runMRRead(10, "part1 = \"p1value1\"");
+ runMRRead(20, "part1 = \"p1value2\"");
+ runMRRead(30, "part1 = \"p1value1\" or part1 = \"p1value2\"");
+ runMRRead(10, "part0 = \"p0value1\"");
+ runMRRead(20, "part0 = \"p0value2\"");
+ runMRRead(30, "part0 = \"p0value1\" or part0 = \"p0value2\"");
- tableSchemaTest();
- columnOrderChangeTest();
- hiveReadTest();
- }
+ tableSchemaTest();
+ columnOrderChangeTest();
+ hiveReadTest();
+ }
- //test that new columns gets added to table schema
- private void tableSchemaTest() throws Exception {
+ //test that new columns gets added to table schema
+ private void tableSchemaTest() throws Exception {
- HCatSchema tableSchema = getTableSchema();
+ HCatSchema tableSchema = getTableSchema();
- assertEquals(4, tableSchema.getFields().size());
+ assertEquals(4, tableSchema.getFields().size());
- //Update partition schema to have 3 fields
- partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c3", Constants.STRING_TYPE_NAME, "")));
+ //Update partition schema to have 3 fields
+ partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c3", Constants.STRING_TYPE_NAME, "")));
- writeRecords = new ArrayList<HCatRecord>();
+ writeRecords = new ArrayList<HCatRecord>();
- for(int i = 0;i < 20;i++) {
- List<Object> objList = new ArrayList<Object>();
+ for (int i = 0; i < 20; i++) {
+ List<Object> objList = new ArrayList<Object>();
- objList.add(i);
- objList.add("strvalue" + i);
- objList.add("str2value" + i);
+ objList.add(i);
+ objList.add("strvalue" + i);
+ objList.add("str2value" + i);
- writeRecords.add(new DefaultHCatRecord(objList));
- }
+ writeRecords.add(new DefaultHCatRecord(objList));
+ }
- Map<String, String> partitionMap = new HashMap<String, String>();
- partitionMap.put("part1", "p1value5");
- partitionMap.put("part0", "p0value5");
-
- runMRCreate(partitionMap, partitionColumns, writeRecords, 10,true);
-
- tableSchema = getTableSchema();
-
- //assert that c3 has got added to table schema
- assertEquals(5, tableSchema.getFields().size());
- assertEquals("c1", tableSchema.getFields().get(0).getName());
- assertEquals("c2", tableSchema.getFields().get(1).getName());
- assertEquals("c3", tableSchema.getFields().get(2).getName());
- assertEquals("part1", tableSchema.getFields().get(3).getName());
- assertEquals("part0", tableSchema.getFields().get(4).getName());
-
- //Test that changing column data type fails
- partitionMap.clear();
- partitionMap.put("part1", "p1value6");
- partitionMap.put("part0", "p0value6");
-
- partitionColumns = new ArrayList<HCatFieldSchema>();
- partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", Constants.INT_TYPE_NAME, "")));
- partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", Constants.INT_TYPE_NAME, "")));
-
- IOException exc = null;
- try {
- runMRCreate(partitionMap, partitionColumns, writeRecords, 20,true);
- } catch(IOException e) {
- exc = e;
- }
+ Map<String, String> partitionMap = new HashMap<String, String>();
+ partitionMap.put("part1", "p1value5");
+ partitionMap.put("part0", "p0value5");
- assertTrue(exc != null);
- assertTrue(exc instanceof HCatException);
- assertEquals(ErrorType.ERROR_SCHEMA_TYPE_MISMATCH, ((HCatException) exc).getErrorType());
-
- //Test that partition key is not allowed in data
- partitionColumns = new ArrayList<HCatFieldSchema>();
- partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", Constants.INT_TYPE_NAME, "")));
- partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", Constants.STRING_TYPE_NAME, "")));
- partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c3", Constants.STRING_TYPE_NAME, "")));
- partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("part1", Constants.STRING_TYPE_NAME, "")));
-
- List<HCatRecord> recordsContainingPartitionCols = new ArrayList<HCatRecord>(20);
- for(int i = 0;i < 20;i++) {
- List<Object> objList = new ArrayList<Object>();
-
- objList.add(i);
- objList.add("c2value" + i);
- objList.add("c3value" + i);
- objList.add("p1value6");
+ runMRCreate(partitionMap, partitionColumns, writeRecords, 10, true);
- recordsContainingPartitionCols.add(new DefaultHCatRecord(objList));
- }
+ tableSchema = getTableSchema();
- exc = null;
- try {
- runMRCreate(partitionMap, partitionColumns, recordsContainingPartitionCols, 20,true);
- } catch(IOException e) {
- exc = e;
- }
+ //assert that c3 has got added to table schema
+ assertEquals(5, tableSchema.getFields().size());
+ assertEquals("c1", tableSchema.getFields().get(0).getName());
+ assertEquals("c2", tableSchema.getFields().get(1).getName());
+ assertEquals("c3", tableSchema.getFields().get(2).getName());
+ assertEquals("part1", tableSchema.getFields().get(3).getName());
+ assertEquals("part0", tableSchema.getFields().get(4).getName());
- List<HCatRecord> records= runMRRead(20,"part1 = \"p1value6\"");
- assertEquals(20, records.size());
- records= runMRRead(20,"part0 = \"p0value6\"");
- assertEquals(20, records.size());
- Integer i =0;
- for(HCatRecord rec : records){
- assertEquals(5, rec.size());
- assertTrue(rec.get(0).equals(i));
- assertTrue(rec.get(1).equals("c2value"+i));
- assertTrue(rec.get(2).equals("c3value"+i));
- assertTrue(rec.get(3).equals("p1value6"));
- assertTrue(rec.get(4).equals("p0value6"));
- i++;
- }
- }
+ //Test that changing column data type fails
+ partitionMap.clear();
+ partitionMap.put("part1", "p1value6");
+ partitionMap.put("part0", "p0value6");
- //check behavior while change the order of columns
- private void columnOrderChangeTest() throws Exception {
+ partitionColumns = new ArrayList<HCatFieldSchema>();
+ partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", Constants.INT_TYPE_NAME, "")));
+ partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", Constants.INT_TYPE_NAME, "")));
- HCatSchema tableSchema = getTableSchema();
+ IOException exc = null;
+ try {
+ runMRCreate(partitionMap, partitionColumns, writeRecords, 20, true);
+ } catch (IOException e) {
+ exc = e;
+ }
- assertEquals(5, tableSchema.getFields().size());
+ assertTrue(exc != null);
+ assertTrue(exc instanceof HCatException);
+ assertEquals(ErrorType.ERROR_SCHEMA_TYPE_MISMATCH, ((HCatException) exc).getErrorType());
- partitionColumns = new ArrayList<HCatFieldSchema>();
- partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", Constants.INT_TYPE_NAME, "")));
- partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c3", Constants.STRING_TYPE_NAME, "")));
- partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", Constants.STRING_TYPE_NAME, "")));
+ //Test that partition key is not allowed in data
+ partitionColumns = new ArrayList<HCatFieldSchema>();
+ partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", Constants.INT_TYPE_NAME, "")));
+ partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", Constants.STRING_TYPE_NAME, "")));
+ partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c3", Constants.STRING_TYPE_NAME, "")));
+ partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("part1", Constants.STRING_TYPE_NAME, "")));
+ List<HCatRecord> recordsContainingPartitionCols = new ArrayList<HCatRecord>(20);
+ for (int i = 0; i < 20; i++) {
+ List<Object> objList = new ArrayList<Object>();
- writeRecords = new ArrayList<HCatRecord>();
+ objList.add(i);
+ objList.add("c2value" + i);
+ objList.add("c3value" + i);
+ objList.add("p1value6");
- for(int i = 0;i < 10;i++) {
- List<Object> objList = new ArrayList<Object>();
+ recordsContainingPartitionCols.add(new DefaultHCatRecord(objList));
+ }
- objList.add(i);
- objList.add("co strvalue" + i);
- objList.add("co str2value" + i);
+ exc = null;
+ try {
+ runMRCreate(partitionMap, partitionColumns, recordsContainingPartitionCols, 20, true);
+ } catch (IOException e) {
+ exc = e;
+ }
- writeRecords.add(new DefaultHCatRecord(objList));
+ List<HCatRecord> records = runMRRead(20, "part1 = \"p1value6\"");
+ assertEquals(20, records.size());
+ records = runMRRead(20, "part0 = \"p0value6\"");
+ assertEquals(20, records.size());
+ Integer i = 0;
+ for (HCatRecord rec : records) {
+ assertEquals(5, rec.size());
+ assertTrue(rec.get(0).equals(i));
+ assertTrue(rec.get(1).equals("c2value" + i));
+ assertTrue(rec.get(2).equals("c3value" + i));
+ assertTrue(rec.get(3).equals("p1value6"));
+ assertTrue(rec.get(4).equals("p0value6"));
+ i++;
+ }
}
- Map<String, String> partitionMap = new HashMap<String, String>();
- partitionMap.put("part1", "p1value8");
- partitionMap.put("part0", "p0value8");
-
- Exception exc = null;
- try {
- runMRCreate(partitionMap, partitionColumns, writeRecords, 10,true);
- } catch(IOException e) {
- exc = e;
- }
+ //check behavior while change the order of columns
+ private void columnOrderChangeTest() throws Exception {
- assertTrue(exc != null);
- assertTrue(exc instanceof HCatException);
- assertEquals(ErrorType.ERROR_SCHEMA_COLUMN_MISMATCH, ((HCatException) exc).getErrorType());
+ HCatSchema tableSchema = getTableSchema();
+ assertEquals(5, tableSchema.getFields().size());
- partitionColumns = new ArrayList<HCatFieldSchema>();
- partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", Constants.INT_TYPE_NAME, "")));
- partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", Constants.STRING_TYPE_NAME, "")));
+ partitionColumns = new ArrayList<HCatFieldSchema>();
+ partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", Constants.INT_TYPE_NAME, "")));
+ partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c3", Constants.STRING_TYPE_NAME, "")));
+ partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", Constants.STRING_TYPE_NAME, "")));
- writeRecords = new ArrayList<HCatRecord>();
- for(int i = 0;i < 10;i++) {
- List<Object> objList = new ArrayList<Object>();
+ writeRecords = new ArrayList<HCatRecord>();
- objList.add(i);
- objList.add("co strvalue" + i);
+ for (int i = 0; i < 10; i++) {
+ List<Object> objList = new ArrayList<Object>();
- writeRecords.add(new DefaultHCatRecord(objList));
- }
+ objList.add(i);
+ objList.add("co strvalue" + i);
+ objList.add("co str2value" + i);
+
+ writeRecords.add(new DefaultHCatRecord(objList));
+ }
+
+ Map<String, String> partitionMap = new HashMap<String, String>();
+ partitionMap.put("part1", "p1value8");
+ partitionMap.put("part0", "p0value8");
- runMRCreate(partitionMap, partitionColumns, writeRecords, 10,true);
+ Exception exc = null;
+ try {
+ runMRCreate(partitionMap, partitionColumns, writeRecords, 10, true);
+ } catch (IOException e) {
+ exc = e;
+ }
- //Read should get 10 + 20 + 10 + 10 + 20 rows
- runMRRead(70);
- }
+ assertTrue(exc != null);
+ assertTrue(exc instanceof HCatException);
+ assertEquals(ErrorType.ERROR_SCHEMA_COLUMN_MISMATCH, ((HCatException) exc).getErrorType());
- //Test that data inserted through hcatoutputformat is readable from hive
- private void hiveReadTest() throws Exception {
- String query = "select * from " + tableName;
- int retCode = driver.run(query).getResponseCode();
+ partitionColumns = new ArrayList<HCatFieldSchema>();
+ partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", Constants.INT_TYPE_NAME, "")));
+ partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", Constants.STRING_TYPE_NAME, "")));
- if( retCode != 0 ) {
- throw new Exception("Error " + retCode + " running query " + query);
+ writeRecords = new ArrayList<HCatRecord>();
+
+ for (int i = 0; i < 10; i++) {
+ List<Object> objList = new ArrayList<Object>();
+
+ objList.add(i);
+ objList.add("co strvalue" + i);
+
+ writeRecords.add(new DefaultHCatRecord(objList));
+ }
+
+ runMRCreate(partitionMap, partitionColumns, writeRecords, 10, true);
+
+ //Read should get 10 + 20 + 10 + 10 + 20 rows
+ runMRRead(70);
}
- ArrayList<String> res = new ArrayList<String>();
- driver.getResults(res);
- assertEquals(70, res.size());
- }
+ //Test that data inserted through hcatoutputformat is readable from hive
+ private void hiveReadTest() throws Exception {
+
+ String query = "select * from " + tableName;
+ int retCode = driver.run(query).getResponseCode();
+
+ if (retCode != 0) {
+ throw new Exception("Error " + retCode + " running query " + query);
+ }
+
+ ArrayList<String> res = new ArrayList<String>();
+ driver.getResults(res);
+ assertEquals(70, res.size());
+ }
}