You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by ga...@apache.org on 2011/04/12 17:30:12 UTC
svn commit: r1091509 [7/8] - in /incubator/hcatalog/trunk: ./ bin/ ivy/ src/
src/docs/ src/docs/src/ src/docs/src/documentation/
src/docs/src/documentation/classes/ src/docs/src/documentation/conf/
src/docs/src/documentation/content/ src/docs/src/docum...
Added: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/data/schema/TestHCatSchemaUtils.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/data/schema/TestHCatSchemaUtils.java?rev=1091509&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/data/schema/TestHCatSchemaUtils.java (added)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/data/schema/TestHCatSchemaUtils.java Tue Apr 12 17:30:08 2011
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.data.schema;
+
+import java.io.PrintStream;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.data.schema.HCatSchemaUtils;
+import org.apache.hcatalog.data.schema.HCatFieldSchema.Category;
+
+public class TestHCatSchemaUtils extends TestCase {
+
+ public void testSimpleOperation() throws Exception{
+ String typeString = "struct<name:string,studentid:int,"
+ + "contact:struct<phno:string,email:string>,"
+ + "currently_registered_courses:array<string>,"
+ + "current_grades:map<string,string>,"
+ + "phnos:array<struct<phno:string,type:string>>,blah:array<int>>";
+
+ TypeInfo ti = TypeInfoUtils.getTypeInfoFromTypeString(typeString);
+
+ HCatSchema hsch = HCatSchemaUtils.getHCatSchemaFromTypeString(typeString);
+ System.out.println(ti.getTypeName());
+ System.out.println(hsch.toString());
+ assertEquals(ti.getTypeName(),hsch.toString());
+ assertEquals(hsch.toString(),typeString);
+ }
+
+ @SuppressWarnings("unused")
+ private void pretty_print(PrintStream pout, HCatSchema hsch) throws HCatException {
+ pretty_print(pout,hsch,"");
+ }
+
+
+ private void pretty_print(PrintStream pout, HCatSchema hsch, String prefix) throws HCatException {
+ int i = 0;
+ for (HCatFieldSchema field : hsch.getFields()){
+ pretty_print(pout,field,prefix+"."+(field.getName()==null?i:field.getName()));
+ i++;
+ }
+ }
+ private void pretty_print(PrintStream pout, HCatFieldSchema hfsch, String prefix) throws HCatException {
+
+ Category tcat = hfsch.getCategory();
+ if (Category.STRUCT == tcat){
+ pretty_print(pout,hfsch.getStructSubSchema(),prefix);
+ }else if (Category.ARRAY == tcat){
+ pretty_print(pout,hfsch.getArrayElementSchema(),prefix);
+ }else if (Category.MAP == tcat){
+ pout.println(prefix + ".mapkey:\t" + hfsch.getMapKeyType().toString());
+ pretty_print(pout,hfsch.getMapValueSchema(),prefix+".mapvalue:");
+ }else{
+ pout.println(prefix + "\t" + hfsch.getType().toString());
+ }
+ }
+
+}
Added: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java?rev=1091509&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java (added)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java Tue Apr 12 17:30:08 2011
@@ -0,0 +1,342 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
+import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.hcatalog.mapreduce.HCatOutputCommitter;
+import org.apache.hcatalog.mapreduce.HCatOutputFormat;
+import org.apache.hcatalog.mapreduce.HCatTableInfo;
+import org.apache.hcatalog.rcfile.RCFileInputDriver;
+import org.apache.hcatalog.rcfile.RCFileOutputDriver;
+
+/**
+ * Test for HowlOutputFormat. Writes a partition using HowlOutputFormat and reads
+ * it back using HCatInputFormat, checks the column values and counts.
+ */
+public abstract class HCatMapReduceTest extends TestCase {
+
+ protected String dbName = "default";
+ protected String tableName = "testHowlMapReduceTable";
+
+ protected String inputFormat = RCFileInputFormat.class.getName();
+ protected String outputFormat = RCFileOutputFormat.class.getName();
+ protected String inputSD = RCFileInputDriver.class.getName();
+ protected String outputSD = RCFileOutputDriver.class.getName();
+ protected String serdeClass = ColumnarSerDe.class.getName();
+
+ private static List<HCatRecord> writeRecords = new ArrayList<HCatRecord>();
+ private static List<HCatRecord> readRecords = new ArrayList<HCatRecord>();
+
+ protected abstract void initialize() throws Exception;
+
+ protected abstract List<FieldSchema> getPartitionKeys();
+
+ protected abstract List<FieldSchema> getTableColumns();
+
+ private HiveMetaStoreClient client;
+ protected HiveConf hiveConf;
+
+ private FileSystem fs;
+ private String thriftUri = null;
+
+ protected Driver driver;
+
+ @Override
+ protected void setUp() throws Exception {
+ hiveConf = new HiveConf(this.getClass());
+
+ //The default org.apache.hadoop.hive.ql.hooks.PreExecutePrinter hook
+ //is present only in the ql/test directory
+ hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+ hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+ hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+ driver = new Driver(hiveConf);
+ SessionState.start(new CliSessionState(hiveConf));
+
+ thriftUri = System.getenv("HCAT_METASTORE_URI");
+
+ if( thriftUri != null ) {
+ System.out.println("Using URI " + thriftUri);
+
+ hiveConf.set("hive.metastore.local", "false");
+ hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, thriftUri);
+ }
+
+ fs = new LocalFileSystem();
+ fs.initialize(fs.getWorkingDirectory().toUri(), new Configuration());
+
+ initialize();
+
+ client = new HiveMetaStoreClient(hiveConf, null);
+ initTable();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ try {
+ String databaseName = (dbName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME : dbName;
+
+ client.dropTable(databaseName, tableName);
+ } catch(Exception e) {
+ e.printStackTrace();
+ throw e;
+ }
+
+ client.close();
+ }
+
+
+
+ private void initTable() throws Exception {
+
+ String databaseName = (dbName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME : dbName;
+
+ try {
+ client.dropTable(databaseName, tableName);
+ } catch(Exception e) {
+ } //can fail with NoSuchObjectException
+
+
+ Table tbl = new Table();
+ tbl.setDbName(databaseName);
+ tbl.setTableName(tableName);
+ tbl.setTableType("MANAGED_TABLE");
+ StorageDescriptor sd = new StorageDescriptor();
+
+ sd.setCols(getTableColumns());
+ tbl.setPartitionKeys(getPartitionKeys());
+
+ tbl.setSd(sd);
+
+ sd.setBucketCols(new ArrayList<String>(2));
+ sd.setSerdeInfo(new SerDeInfo());
+ sd.getSerdeInfo().setName(tbl.getTableName());
+ sd.getSerdeInfo().setParameters(new HashMap<String, String>());
+ sd.getSerdeInfo().getParameters().put(
+ org.apache.hadoop.hive.serde.Constants.SERIALIZATION_FORMAT, "1");
+ sd.getSerdeInfo().setSerializationLib(serdeClass);
+ sd.setInputFormat(inputFormat);
+ sd.setOutputFormat(outputFormat);
+
+ Map<String, String> tableParams = new HashMap<String, String>();
+ tableParams.put(HCatConstants.HCAT_ISD_CLASS, inputSD);
+ tableParams.put(HCatConstants.HCAT_OSD_CLASS, outputSD);
+ tbl.setParameters(tableParams);
+
+ client.createTable(tbl);
+ }
+
+ //Create test input file with specified number of rows
+ private void createInputFile(Path path, int rowCount) throws IOException {
+
+ if( fs.exists(path) ) {
+ fs.delete(path, true);
+ }
+
+ FSDataOutputStream os = fs.create(path);
+
+ for(int i = 0;i < rowCount;i++) {
+ os.writeChars(i + "\n");
+ }
+
+ os.close();
+ }
+
+ public static class MapCreate extends
+ Mapper<LongWritable, Text, BytesWritable, HCatRecord> {
+
+ static int writeCount = 0; //test will be in local mode
+
+ @Override
+ public void map(LongWritable key, Text value, Context context
+ ) throws IOException, InterruptedException {
+ {
+ try {
+ HCatRecord rec = writeRecords.get(writeCount);
+ context.write(null, rec);
+ writeCount++;
+
+ }catch(Exception e) {
+
+ e.printStackTrace(System.err); //print since otherwise exception is lost
+ throw new IOException(e);
+ }
+ }
+ }
+ }
+
+ public static class MapRead extends
+ Mapper<WritableComparable, HCatRecord, BytesWritable, Text> {
+
+ static int readCount = 0; //test will be in local mode
+
+ @Override
+ public void map(WritableComparable key, HCatRecord value, Context context
+ ) throws IOException, InterruptedException {
+ {
+ try {
+ readRecords.add(value);
+ readCount++;
+ } catch(Exception e) {
+ e.printStackTrace(); //print since otherwise exception is lost
+ throw new IOException(e);
+ }
+ }
+ }
+ }
+
+ void runMRCreate(Map<String, String> partitionValues,
+ List<HCatFieldSchema> partitionColumns, List<HCatRecord> records,
+ int writeCount) throws Exception {
+
+ writeRecords = records;
+ MapCreate.writeCount = 0;
+
+ Configuration conf = new Configuration();
+ Job job = new Job(conf, "howl mapreduce write test");
+ job.setJarByClass(this.getClass());
+ job.setMapperClass(HCatMapReduceTest.MapCreate.class);
+
+ // input/output settings
+ job.setInputFormatClass(TextInputFormat.class);
+
+ Path path = new Path(fs.getWorkingDirectory(), "mapred/testHowlMapReduceInput");
+ createInputFile(path, writeCount);
+
+ TextInputFormat.setInputPaths(job, path);
+
+ job.setOutputFormatClass(HCatOutputFormat.class);
+
+ HCatTableInfo outputInfo = HCatTableInfo.getOutputTableInfo(thriftUri, null, dbName, tableName, partitionValues);
+ HCatOutputFormat.setOutput(job, outputInfo);
+
+ job.setMapOutputKeyClass(BytesWritable.class);
+ job.setMapOutputValueClass(DefaultHCatRecord.class);
+
+ job.setNumReduceTasks(0);
+
+ HCatOutputFormat.setSchema(job, new HCatSchema(partitionColumns));
+
+ //new HowlOutputCommitter(null).setupJob(job);
+ job.waitForCompletion(true);
+ new HCatOutputCommitter(null).cleanupJob(job);
+ Assert.assertEquals(writeCount, MapCreate.writeCount);
+ }
+
+ List<HCatRecord> runMRRead(int readCount) throws Exception {
+ return runMRRead(readCount, null);
+ }
+
+ List<HCatRecord> runMRRead(int readCount, String filter) throws Exception {
+
+ MapRead.readCount = 0;
+ readRecords.clear();
+
+ Configuration conf = new Configuration();
+ Job job = new Job(conf, "howl mapreduce read test");
+ job.setJarByClass(this.getClass());
+ job.setMapperClass(HCatMapReduceTest.MapRead.class);
+
+ // input/output settings
+ job.setInputFormatClass(HCatInputFormat.class);
+ job.setOutputFormatClass(TextOutputFormat.class);
+
+ HCatTableInfo inputInfo = HCatTableInfo.getInputTableInfo(
+ thriftUri, null, dbName, tableName, filter);
+ HCatInputFormat.setInput(job, inputInfo);
+
+ job.setMapOutputKeyClass(BytesWritable.class);
+ job.setMapOutputValueClass(Text.class);
+
+ job.setNumReduceTasks(0);
+
+ Path path = new Path(fs.getWorkingDirectory(), "mapred/testHowlMapReduceOutput");
+ if( fs.exists(path) ) {
+ fs.delete(path, true);
+ }
+
+ TextOutputFormat.setOutputPath(job, path);
+
+ job.waitForCompletion(true);
+ Assert.assertEquals(readCount, MapRead.readCount);
+
+ return readRecords;
+ }
+
+
+ protected HCatSchema getTableSchema() throws Exception {
+
+ Configuration conf = new Configuration();
+ Job job = new Job(conf, "howl mapreduce read schema test");
+ job.setJarByClass(this.getClass());
+
+ // input/output settings
+ job.setInputFormatClass(HCatInputFormat.class);
+ job.setOutputFormatClass(TextOutputFormat.class);
+
+ HCatTableInfo inputInfo = HCatTableInfo.getInputTableInfo(thriftUri, null, dbName, tableName);
+ HCatInputFormat.setInput(job, inputInfo);
+
+ return HCatInputFormat.getTableSchema(job);
+ }
+
+}
+
+
+
Added: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatHiveCompatibility.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatHiveCompatibility.java?rev=1091509&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatHiveCompatibility.java (added)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatHiveCompatibility.java Tue Apr 12 17:30:08 2011
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.Properties;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hcatalog.MiniCluster;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.pig.HCatLoader;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.util.UDFContext;
+
+
+public class TestHCatHiveCompatibility extends TestCase {
+
+ MiniCluster cluster = MiniCluster.buildCluster();
+ private Driver driver;
+ Properties props;
+
+ private HiveMetaStoreClient client;
+
+ String fileName = "/tmp/input.data";
+ String fullFileName;
+
+ @Override
+ protected void setUp() throws Exception {
+
+ HiveConf hiveConf = new HiveConf(this.getClass());
+ hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+ hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+ hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+ driver = new Driver(hiveConf);
+ client = new HiveMetaStoreClient(hiveConf);
+ SessionState.start(new CliSessionState(hiveConf));
+ props = new Properties();
+ props.setProperty("fs.default.name", cluster.getProperties().getProperty("fs.default.name"));
+ fullFileName = cluster.getProperties().getProperty("fs.default.name") + fileName;
+
+ MiniCluster.deleteFile(cluster, fileName);
+ int LOOP_SIZE = 11;
+ String[] input = new String[LOOP_SIZE];
+ for(int i = 0; i < LOOP_SIZE; i++) {
+ input[i] = i + "\t1";
+ }
+ MiniCluster.createInputFile(cluster, fileName, input);
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ MiniCluster.deleteFile(cluster, fileName);
+ }
+
+ public void testUnpartedReadWrite() throws Exception{
+
+ driver.run("drop table junit_unparted_noisd");
+ String createTable = "create table junit_unparted_noisd(a int) stored as RCFILE";
+ int retCode = driver.run(createTable).getResponseCode();
+ if(retCode != 0) {
+ throw new IOException("Failed to create table.");
+ }
+
+ // assert that the table created has no howl instrumentation, and that we're still able to read it.
+ Table table = client.getTable("default", "junit_unparted_noisd");
+ assertFalse(table.getParameters().containsKey(HCatConstants.HCAT_ISD_CLASS));
+ assertTrue(table.getSd().getInputFormat().equals(HCatConstants.HIVE_RCFILE_IF_CLASS));
+
+ PigServer server = new PigServer(ExecType.LOCAL, props);
+ UDFContext.getUDFContext().setClientSystemProps();
+ server.registerQuery("A = load '"+fullFileName+"' as (a:int);");
+ server.registerQuery("store A into 'default.junit_unparted_noisd' using org.apache.hcatalog.pig.HCatStorer();");
+ server.registerQuery("B = load 'default.junit_unparted_noisd' using "+HCatLoader.class.getName()+"();");
+ Iterator<Tuple> itr= server.openIterator("B");
+
+ int i = 0;
+
+ while(itr.hasNext()){
+ Tuple t = itr.next();
+ assertEquals(1, t.size());
+ assertEquals(t.get(0), i);
+ i++;
+ }
+
+ assertFalse(itr.hasNext());
+ assertEquals(11, i);
+
+ // assert that the table created still has no howl instrumentation
+ Table table2 = client.getTable("default", "junit_unparted_noisd");
+ assertFalse(table2.getParameters().containsKey(HCatConstants.HCAT_ISD_CLASS));
+ assertTrue(table2.getSd().getInputFormat().equals(HCatConstants.HIVE_RCFILE_IF_CLASS));
+
+ driver.run("drop table junit_unparted_noisd");
+ }
+
+ public void testPartedRead() throws Exception{
+
+ driver.run("drop table junit_parted_noisd");
+ String createTable = "create table junit_parted_noisd(a int) partitioned by (b string) stored as RCFILE";
+ int retCode = driver.run(createTable).getResponseCode();
+ if(retCode != 0) {
+ throw new IOException("Failed to create table.");
+ }
+
+ // assert that the table created has no howl instrumentation, and that we're still able to read it.
+ Table table = client.getTable("default", "junit_parted_noisd");
+
+ assertFalse(table.getParameters().containsKey(HCatConstants.HCAT_ISD_CLASS));
+ assertTrue(table.getSd().getInputFormat().equals(HCatConstants.HIVE_RCFILE_IF_CLASS));
+
+ PigServer server = new PigServer(ExecType.LOCAL, props);
+ UDFContext.getUDFContext().setClientSystemProps();
+ server.registerQuery("A = load '"+fullFileName+"' as (a:int);");
+ server.registerQuery("store A into 'default.junit_parted_noisd' using org.apache.hcatalog.pig.HCatStorer('b=42');");
+ server.registerQuery("B = load 'default.junit_parted_noisd' using "+HCatLoader.class.getName()+"();");
+ Iterator<Tuple> itr= server.openIterator("B");
+
+ int i = 0;
+
+ while(itr.hasNext()){
+ Tuple t = itr.next();
+ assertEquals(2, t.size());
+ assertEquals(t.get(0), i);
+ assertEquals(t.get(1), "42");
+ i++;
+ }
+
+ assertFalse(itr.hasNext());
+ assertEquals(11, i);
+
+ // assert that the table created still has no howl instrumentation
+ Table table2 = client.getTable("default", "junit_parted_noisd");
+ assertFalse(table2.getParameters().containsKey(HCatConstants.HCAT_ISD_CLASS));
+ assertTrue(table2.getSd().getInputFormat().equals(HCatConstants.HIVE_RCFILE_IF_CLASS));
+
+ // assert that there is one partition present, and it had howl instrumentation inserted when it was created.
+ Partition ptn = client.getPartition("default", "junit_parted_noisd", Arrays.asList("42"));
+
+ assertNotNull(ptn);
+ assertTrue(ptn.getParameters().containsKey(HCatConstants.HCAT_ISD_CLASS));
+ assertTrue(ptn.getSd().getInputFormat().equals(HCatConstants.HIVE_RCFILE_IF_CLASS));
+ driver.run("drop table junit_unparted_noisd");
+ }
+
+
+}
Added: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java?rev=1091509&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java (added)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java Tue Apr 12 17:30:08 2011
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hcatalog.common.ErrorType;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchemaUtils;
+
+public class TestHCatNonPartitioned extends HCatMapReduceTest {
+
+ private List<HCatRecord> writeRecords;
+ List<HCatFieldSchema> partitionColumns;
+
+ @Override
+ protected void initialize() throws HCatException {
+
+ dbName = null; //test if null dbName works ("default" is used)
+ tableName = "testHowlNonPartitionedTable";
+
+ writeRecords = new ArrayList<HCatRecord>();
+
+ for(int i = 0;i < 20;i++) {
+ List<Object> objList = new ArrayList<Object>();
+
+ objList.add(i);
+ objList.add("strvalue" + i);
+ writeRecords.add(new DefaultHCatRecord(objList));
+ }
+
+ partitionColumns = new ArrayList<HCatFieldSchema>();
+ partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", Constants.INT_TYPE_NAME, "")));
+ partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", Constants.STRING_TYPE_NAME, "")));
+ }
+
+ @Override
+ protected List<FieldSchema> getPartitionKeys() {
+ List<FieldSchema> fields = new ArrayList<FieldSchema>();
+ //empty list, non partitioned
+ return fields;
+ }
+
+ @Override
+ protected List<FieldSchema> getTableColumns() {
+ List<FieldSchema> fields = new ArrayList<FieldSchema>();
+ fields.add(new FieldSchema("c1", Constants.INT_TYPE_NAME, ""));
+ fields.add(new FieldSchema("c2", Constants.STRING_TYPE_NAME, ""));
+ return fields;
+ }
+
+
+ public void testHowlNonPartitionedTable() throws Exception {
+
+ Map<String, String> partitionMap = new HashMap<String, String>();
+ runMRCreate(null, partitionColumns, writeRecords, 10);
+
+ //Test for duplicate publish
+ IOException exc = null;
+ try {
+ runMRCreate(null, partitionColumns, writeRecords, 20);
+ } catch(IOException e) {
+ exc = e;
+ }
+
+ assertTrue(exc != null);
+ assertTrue(exc instanceof HCatException);
+ assertEquals(ErrorType.ERROR_NON_EMPTY_TABLE, ((HCatException) exc).getErrorType());
+
+ //Test for publish with invalid partition key name
+ exc = null;
+ partitionMap.clear();
+ partitionMap.put("px", "p1value2");
+
+ try {
+ runMRCreate(partitionMap, partitionColumns, writeRecords, 20);
+ } catch(IOException e) {
+ exc = e;
+ }
+
+ assertTrue(exc != null);
+ assertTrue(exc instanceof HCatException);
+ assertEquals(ErrorType.ERROR_INVALID_PARTITION_VALUES, ((HCatException) exc).getErrorType());
+
+ //Read should get 10 rows
+ runMRRead(10);
+
+ hiveReadTest();
+ }
+
+ //Test that data inserted through howloutputformat is readable from hive
+ private void hiveReadTest() throws Exception {
+
+ String query = "select * from " + tableName;
+ int retCode = driver.run(query).getResponseCode();
+
+ if( retCode != 0 ) {
+ throw new Exception("Error " + retCode + " running query " + query);
+ }
+
+ ArrayList<String> res = new ArrayList<String>();
+ driver.getResults(res);
+ assertEquals(10, res.size());
+ }
+}
Added: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java?rev=1091509&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java (added)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java Tue Apr 12 17:30:08 2011
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.mapreduce.HCatOutputCommitter;
+import org.apache.hcatalog.mapreduce.HCatOutputFormat;
+import org.apache.hcatalog.mapreduce.HCatTableInfo;
+import org.apache.hcatalog.mapreduce.InitializeInput;
+import org.apache.hcatalog.mapreduce.OutputJobInfo;
+import org.apache.hcatalog.mapreduce.StorerInfo;
+import org.apache.hcatalog.rcfile.RCFileOutputDriver;
+
+public class TestHCatOutputFormat extends TestCase {
+ private HiveMetaStoreClient client;
+ private HiveConf hiveConf;
+
+ private static final String dbName = "howlOutputFormatTestDB";
+ private static final String tblName = "howlOutputFormatTestTable";
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ hiveConf = new HiveConf(this.getClass());
+
+ try {
+ client = new HiveMetaStoreClient(hiveConf, null);
+
+ initTable();
+ } catch (Throwable e) {
+ System.err.println("Unable to open the metastore");
+ System.err.println(StringUtils.stringifyException(e));
+ throw new Exception(e);
+ }
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ try {
+ super.tearDown();
+ client.dropTable(dbName, tblName);
+ client.dropDatabase(dbName);
+
+ client.close();
+ } catch (Throwable e) {
+ System.err.println("Unable to close metastore");
+ System.err.println(StringUtils.stringifyException(e));
+ throw new Exception(e);
+ }
+ }
+
+ private void initTable() throws Exception {
+
+ try {
+ client.dropTable(dbName, tblName);
+ } catch(Exception e) {}
+ try {
+ client.dropDatabase(dbName);
+ } catch(Exception e) {}
+ client.createDatabase(new Database(dbName, "", null,null));
+ assertNotNull((client.getDatabase(dbName).getLocationUri()));
+
+ List<FieldSchema> fields = new ArrayList<FieldSchema>();
+ fields.add(new FieldSchema("colname", Constants.STRING_TYPE_NAME, ""));
+
+ Table tbl = new Table();
+ tbl.setDbName(dbName);
+ tbl.setTableName(tblName);
+ StorageDescriptor sd = new StorageDescriptor();
+ sd.setCols(fields);
+ tbl.setSd(sd);
+
+ //sd.setLocation("hdfs://tmp");
+ sd.setParameters(new HashMap<String, String>());
+ sd.getParameters().put("test_param_1", "Use this for comments etc");
+ sd.setBucketCols(new ArrayList<String>(2));
+ sd.getBucketCols().add("name");
+ sd.setSerdeInfo(new SerDeInfo());
+ sd.getSerdeInfo().setName(tbl.getTableName());
+ sd.getSerdeInfo().setParameters(new HashMap<String, String>());
+ sd.getSerdeInfo().getParameters().put(
+ org.apache.hadoop.hive.serde.Constants.SERIALIZATION_FORMAT, "1");
+ sd.getSerdeInfo().setSerializationLib(
+ org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.class.getName());
+ tbl.setPartitionKeys(fields);
+
+ Map<String, String> tableParams = new HashMap<String, String>();
+ tableParams.put(HCatConstants.HCAT_OSD_CLASS, RCFileOutputDriver.class.getName());
+ tableParams.put(HCatConstants.HCAT_ISD_CLASS, "testInputClass");
+ tableParams.put("hcat.testarg", "testArgValue");
+
+ tbl.setParameters(tableParams);
+
+ client.createTable(tbl);
+ Path tblPath = new Path(client.getTable(dbName, tblName).getSd().getLocation());
+ assertTrue(tblPath.getFileSystem(hiveConf).mkdirs(new Path(tblPath,"colname=p1")));
+
+ }
+
+ public void testSetOutput() throws Exception {
+ Configuration conf = new Configuration();
+ Job job = new Job(conf, "test outputformat");
+
+ Map<String, String> partitionValues = new HashMap<String, String>();
+ partitionValues.put("colname", "p1");
+ //null server url means local mode
+ HCatTableInfo info = HCatTableInfo.getOutputTableInfo(null, null, dbName, tblName, partitionValues);
+
+ HCatOutputFormat.setOutput(job, info);
+ OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(job);
+
+ assertNotNull(jobInfo.getTableInfo());
+ assertEquals(1, jobInfo.getTableInfo().getPartitionValues().size());
+ assertEquals("p1", jobInfo.getTableInfo().getPartitionValues().get("colname"));
+ assertEquals(1, jobInfo.getTableSchema().getFields().size());
+ assertEquals("colname", jobInfo.getTableSchema().getFields().get(0).getName());
+
+ StorerInfo storer = jobInfo.getStorerInfo();
+ assertEquals(RCFileOutputDriver.class.getName(), storer.getOutputSDClass());
+
+ publishTest(job);
+ }
+
+ public void publishTest(Job job) throws Exception {
+ OutputCommitter committer = new HCatOutputCommitter(null);
+ committer.cleanupJob(job);
+
+ Partition part = client.getPartition(dbName, tblName, Arrays.asList("p1"));
+ assertNotNull(part);
+
+ StorerInfo storer = InitializeInput.extractStorerInfo(part.getSd(),part.getParameters());
+ assertEquals(storer.getInputSDClass(), "testInputClass");
+ assertEquals(storer.getProperties().get("hcat.testarg"), "testArgValue");
+ assertTrue(part.getSd().getLocation().indexOf("p1") != -1);
+ }
+}
Added: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java?rev=1091509&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java (added)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java Tue Apr 12 17:30:08 2011
@@ -0,0 +1,314 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hcatalog.common.ErrorType;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.data.schema.HCatSchemaUtils;
+
+public class TestHCatPartitioned extends HCatMapReduceTest {
+
+ private List<HCatRecord> writeRecords;
+ private List<HCatFieldSchema> partitionColumns;
+
+ @Override
+ protected void initialize() throws Exception {
+
+ tableName = "testHowlPartitionedTable";
+ writeRecords = new ArrayList<HCatRecord>();
+
+ for(int i = 0;i < 20;i++) {
+ List<Object> objList = new ArrayList<Object>();
+
+ objList.add(i);
+ objList.add("strvalue" + i);
+ writeRecords.add(new DefaultHCatRecord(objList));
+ }
+
+ partitionColumns = new ArrayList<HCatFieldSchema>();
+ partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", Constants.INT_TYPE_NAME, "")));
+ partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", Constants.STRING_TYPE_NAME, "")));
+ }
+
+
+ @Override
+ protected List<FieldSchema> getPartitionKeys() {
+ List<FieldSchema> fields = new ArrayList<FieldSchema>();
+ fields.add(new FieldSchema("PaRT1", Constants.STRING_TYPE_NAME, ""));
+ return fields;
+ }
+
+ @Override
+ protected List<FieldSchema> getTableColumns() {
+ List<FieldSchema> fields = new ArrayList<FieldSchema>();
+ fields.add(new FieldSchema("c1", Constants.INT_TYPE_NAME, ""));
+ fields.add(new FieldSchema("c2", Constants.STRING_TYPE_NAME, ""));
+ return fields;
+ }
+
+
+ public void testHowlPartitionedTable() throws Exception {
+
+ Map<String, String> partitionMap = new HashMap<String, String>();
+ partitionMap.put("part1", "p1value1");
+
+ runMRCreate(partitionMap, partitionColumns, writeRecords, 10);
+
+ partitionMap.clear();
+ partitionMap.put("PART1", "p1value2");
+
+ runMRCreate(partitionMap, partitionColumns, writeRecords, 20);
+
+ //Test for duplicate publish
+ IOException exc = null;
+ try {
+ runMRCreate(partitionMap, partitionColumns, writeRecords, 20);
+ } catch(IOException e) {
+ exc = e;
+ }
+
+ assertTrue(exc != null);
+ assertTrue(exc instanceof HCatException);
+ assertEquals(ErrorType.ERROR_DUPLICATE_PARTITION, ((HCatException) exc).getErrorType());
+
+ //Test for publish with invalid partition key name
+ exc = null;
+ partitionMap.clear();
+ partitionMap.put("px", "p1value2");
+
+ try {
+ runMRCreate(partitionMap, partitionColumns, writeRecords, 20);
+ } catch(IOException e) {
+ exc = e;
+ }
+
+ assertTrue(exc != null);
+ assertTrue(exc instanceof HCatException);
+ assertEquals(ErrorType.ERROR_MISSING_PARTITION_KEY, ((HCatException) exc).getErrorType());
+
+
+ //Test for null partition value map
+ exc = null;
+ try {
+ runMRCreate(null, partitionColumns, writeRecords, 20);
+ } catch(IOException e) {
+ exc = e;
+ }
+
+ assertTrue(exc != null);
+ assertTrue(exc instanceof HCatException);
+ assertEquals(ErrorType.ERROR_INVALID_PARTITION_VALUES, ((HCatException) exc).getErrorType());
+
+ //Read should get 10 + 20 rows
+ runMRRead(30);
+
+ //Read with partition filter
+ runMRRead(10, "part1 = \"p1value1\"");
+ runMRRead(20, "part1 = \"p1value2\"");
+ runMRRead(30, "part1 = \"p1value1\" or part1 = \"p1value2\"");
+
+ tableSchemaTest();
+ columnOrderChangeTest();
+ hiveReadTest();
+ }
+
+
+ //test that new columns gets added to table schema
+ private void tableSchemaTest() throws Exception {
+
+ HCatSchema tableSchema = getTableSchema();
+
+ assertEquals(3, tableSchema.getFields().size());
+
+ //Update partition schema to have 3 fields
+ partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c3", Constants.STRING_TYPE_NAME, "")));
+
+ writeRecords = new ArrayList<HCatRecord>();
+
+ for(int i = 0;i < 20;i++) {
+ List<Object> objList = new ArrayList<Object>();
+
+ objList.add(i);
+ objList.add("strvalue" + i);
+ objList.add("str2value" + i);
+
+ writeRecords.add(new DefaultHCatRecord(objList));
+ }
+
+ Map<String, String> partitionMap = new HashMap<String, String>();
+ partitionMap.put("part1", "p1value5");
+
+ runMRCreate(partitionMap, partitionColumns, writeRecords, 10);
+
+ tableSchema = getTableSchema();
+
+ //assert that c3 has got added to table schema
+ assertEquals(4, tableSchema.getFields().size());
+ assertEquals("c1", tableSchema.getFields().get(0).getName());
+ assertEquals("c2", tableSchema.getFields().get(1).getName());
+ assertEquals("c3", tableSchema.getFields().get(2).getName());
+ assertEquals("part1", tableSchema.getFields().get(3).getName());
+
+ //Test that changing column data type fails
+ partitionMap.clear();
+ partitionMap.put("part1", "p1value6");
+
+ partitionColumns = new ArrayList<HCatFieldSchema>();
+ partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", Constants.INT_TYPE_NAME, "")));
+ partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", Constants.INT_TYPE_NAME, "")));
+
+ IOException exc = null;
+ try {
+ runMRCreate(partitionMap, partitionColumns, writeRecords, 20);
+ } catch(IOException e) {
+ exc = e;
+ }
+
+ assertTrue(exc != null);
+ assertTrue(exc instanceof HCatException);
+ assertEquals(ErrorType.ERROR_SCHEMA_TYPE_MISMATCH, ((HCatException) exc).getErrorType());
+
+ //Test that partition key is not allowed in data
+ partitionColumns = new ArrayList<HCatFieldSchema>();
+ partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", Constants.INT_TYPE_NAME, "")));
+ partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", Constants.STRING_TYPE_NAME, "")));
+ partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c3", Constants.STRING_TYPE_NAME, "")));
+ partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("part1", Constants.STRING_TYPE_NAME, "")));
+
+ List<HCatRecord> recordsContainingPartitionCols = new ArrayList<HCatRecord>(20);
+ for(int i = 0;i < 20;i++) {
+ List<Object> objList = new ArrayList<Object>();
+
+ objList.add(i);
+ objList.add("c2value" + i);
+ objList.add("c3value" + i);
+ objList.add("p1value6");
+
+ recordsContainingPartitionCols.add(new DefaultHCatRecord(objList));
+ }
+
+ exc = null;
+ try {
+ runMRCreate(partitionMap, partitionColumns, recordsContainingPartitionCols, 20);
+ } catch(IOException e) {
+ exc = e;
+ }
+
+ List<HCatRecord> records= runMRRead(20,"part1 = \"p1value6\"");
+ assertEquals(20, records.size());
+ Integer i =0;
+ for(HCatRecord rec : records){
+ assertEquals(4, rec.size());
+ assertTrue(rec.get(0).equals(i));
+ assertTrue(rec.get(1).equals("c2value"+i));
+ assertTrue(rec.get(2).equals("c3value"+i));
+ assertTrue(rec.get(3).equals("p1value6"));
+ i++;
+ }
+ }
+
+ //check behavior while change the order of columns
+ private void columnOrderChangeTest() throws Exception {
+
+ HCatSchema tableSchema = getTableSchema();
+
+ assertEquals(4, tableSchema.getFields().size());
+
+ partitionColumns = new ArrayList<HCatFieldSchema>();
+ partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", Constants.INT_TYPE_NAME, "")));
+ partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c3", Constants.STRING_TYPE_NAME, "")));
+ partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", Constants.STRING_TYPE_NAME, "")));
+
+
+ writeRecords = new ArrayList<HCatRecord>();
+
+ for(int i = 0;i < 10;i++) {
+ List<Object> objList = new ArrayList<Object>();
+
+ objList.add(i);
+ objList.add("co strvalue" + i);
+ objList.add("co str2value" + i);
+
+ writeRecords.add(new DefaultHCatRecord(objList));
+ }
+
+ Map<String, String> partitionMap = new HashMap<String, String>();
+ partitionMap.put("part1", "p1value8");
+
+
+ Exception exc = null;
+ try {
+ runMRCreate(partitionMap, partitionColumns, writeRecords, 10);
+ } catch(IOException e) {
+ exc = e;
+ }
+
+ assertTrue(exc != null);
+ assertTrue(exc instanceof HCatException);
+ assertEquals(ErrorType.ERROR_SCHEMA_COLUMN_MISMATCH, ((HCatException) exc).getErrorType());
+
+
+ partitionColumns = new ArrayList<HCatFieldSchema>();
+ partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", Constants.INT_TYPE_NAME, "")));
+ partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", Constants.STRING_TYPE_NAME, "")));
+
+ writeRecords = new ArrayList<HCatRecord>();
+
+ for(int i = 0;i < 10;i++) {
+ List<Object> objList = new ArrayList<Object>();
+
+ objList.add(i);
+ objList.add("co strvalue" + i);
+
+ writeRecords.add(new DefaultHCatRecord(objList));
+ }
+
+ runMRCreate(partitionMap, partitionColumns, writeRecords, 10);
+
+ //Read should get 10 + 20 + 10 + 10 + 20 rows
+ runMRRead(70);
+ }
+
+ //Test that data inserted through howloutputformat is readable from hive
+ private void hiveReadTest() throws Exception {
+
+ String query = "select * from " + tableName;
+ int retCode = driver.run(query).getResponseCode();
+
+ if( retCode != 0 ) {
+ throw new Exception("Error " + retCode + " running query " + query);
+ }
+
+ ArrayList<String> res = new ArrayList<String>();
+ driver.getResults(res);
+ assertEquals(70, res.size());
+ }
+}
Added: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/MyPigStorageDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/MyPigStorageDriver.java?rev=1091509&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/MyPigStorageDriver.java (added)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/MyPigStorageDriver.java Tue Apr 12 17:30:08 2011
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.pig;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hcatalog.pig.drivers.PigStorageInputDriver;
+
+public class MyPigStorageDriver extends PigStorageInputDriver{
+
+ @Override
+ public void initialize(JobContext context, Properties storageDriverArgs) throws IOException {
+ if ( !"control-A".equals(storageDriverArgs.getProperty(PigStorageInputDriver.delim))){
+ /* This is the only way to make testcase fail. Throwing exception from
+ * here doesn't propagate up.
+ */
+ System.exit(1);
+ }
+ super.initialize(context, storageDriverArgs);
+ }
+}
Added: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatLoader.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatLoader.java?rev=1091509&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatLoader.java (added)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatLoader.java Tue Apr 12 17:30:08 2011
@@ -0,0 +1,381 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.pig;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hcatalog.MiniCluster;
+import org.apache.hcatalog.data.Pair;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+import org.apache.pig.impl.util.UDFContext;
+
+public class TestHCatLoader extends TestCase {
+
+ private static final String BASIC_TABLE = "junit_unparted_basic";
+ private static final String COMPLEX_TABLE = "junit_unparted_complex";
+ private static final String PARTITIONED_TABLE = "junit_parted_basic";
+ private static MiniCluster cluster = MiniCluster.buildCluster();
+ private static Driver driver;
+ private static Properties props;
+
+ private static final String basicFile = "/tmp/basic.input.data";
+ private static final String complexFile = "/tmp/complex.input.data";
+ private static String fullFileNameBasic;
+ private static String fullFileNameComplex;
+
+ private static int guardTestCount = 5; // ugh, instantiate using introspection in guardedSetupBeforeClass
+ private static boolean setupHasRun = false;
+
+ private static Map<Integer,Pair<Integer,String>> basicInputData;
+
+ private void dropTable(String tablename) throws IOException{
+ driver.run("drop table "+tablename);
+ }
+ private void createTable(String tablename, String schema, String partitionedBy) throws IOException{
+ String createTable;
+ createTable = "create table "+tablename+"("+schema+") ";
+ if ((partitionedBy != null)&&(!partitionedBy.trim().isEmpty())){
+ createTable = createTable + "partitioned by ("+partitionedBy+") ";
+ }
+ createTable = createTable + "stored as RCFILE tblproperties('hcat.isd'='org.apache.hcatalog.rcfile.RCFileInputDriver'," +
+ "'hcat.osd'='org.apache.hcatalog.rcfile.RCFileOutputDriver') ";
+ int retCode = driver.run(createTable).getResponseCode();
+ if(retCode != 0) {
+ throw new IOException("Failed to create table. ["+createTable+"], return code from hive driver : ["+retCode+"]");
+ }
+ }
+
+ private void createTable(String tablename, String schema) throws IOException{
+ createTable(tablename,schema,null);
+ }
+
+ protected void guardedSetUpBeforeClass() throws Exception {
+ if (!setupHasRun){
+ setupHasRun = true;
+ }else{
+ return;
+ }
+
+ HiveConf hiveConf = new HiveConf(this.getClass());
+ hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+ hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+ hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+ driver = new Driver(hiveConf);
+ SessionState.start(new CliSessionState(hiveConf));
+ props = new Properties();
+ props.setProperty("fs.default.name", cluster.getProperties().getProperty("fs.default.name"));
+ fullFileNameBasic = cluster.getProperties().getProperty("fs.default.name") + basicFile;
+ fullFileNameComplex = cluster.getProperties().getProperty("fs.default.name") + complexFile;
+
+ cleanup();
+
+ createTable(BASIC_TABLE,"a int, b string");
+ createTable(COMPLEX_TABLE,
+ "name string, studentid int, "
+ + "contact struct<phno:string,email:string>, "
+ + "currently_registered_courses array<string>, "
+ + "current_grades map<string,string>, "
+ + "phnos array<struct<phno:string,type:string>>");
+
+ createTable(PARTITIONED_TABLE,"a int, b string","bkt string");
+
+
+ int LOOP_SIZE = 3;
+ String[] input = new String[LOOP_SIZE*LOOP_SIZE];
+ basicInputData = new HashMap<Integer,Pair<Integer,String>>();
+ int k = 0;
+ for(int i = 1; i <= LOOP_SIZE; i++) {
+ String si = i + "";
+ for(int j=1;j<=LOOP_SIZE;j++) {
+ String sj = "S"+j+"S";
+ input[k] = si + "\t" + sj;
+ basicInputData.put(k, new Pair<Integer,String>(i,sj));
+ k++;
+ }
+ }
+ MiniCluster.createInputFile(cluster, basicFile, input);
+
+ MiniCluster.createInputFile(cluster, complexFile,
+ new String[]{
+ //"Henry Jekyll\t42\t(415-253-6367,hjekyll@contemporary.edu.uk)\t{(PHARMACOLOGY),(PSYCHIATRY)},[PHARMACOLOGY#A-,PSYCHIATRY#B+],{(415-253-6367,cell),(408-253-6367,landline)}",
+ //"Edward Hyde\t1337\t(415-253-6367,anonymous@b44chan.org)\t{(CREATIVE_WRITING),(COPYRIGHT_LAW)},[CREATIVE_WRITING#A+,COPYRIGHT_LAW#D],{(415-253-6367,cell),(408-253-6367,landline)}",
+ }
+ );
+
+ PigServer server = new PigServer(ExecType.LOCAL, props);
+ UDFContext.getUDFContext().setClientSystemProps();
+ server.setBatchOn();
+ server.registerQuery("A = load '"+fullFileNameBasic+"' as (a:int, b:chararray);");
+
+ server.registerQuery("store A into '"+BASIC_TABLE+"' using org.apache.hcatalog.pig.HCatStorer();");
+ server.registerQuery("B = foreach A generate a,b;");
+ server.registerQuery("B2 = filter B by a < 2;");
+ server.registerQuery("store B2 into '"+PARTITIONED_TABLE+"' using org.apache.hcatalog.pig.HCatStorer('bkt=0');");
+
+ server.registerQuery("C = foreach A generate a,b;");
+ server.registerQuery("C2 = filter C by a >= 2;");
+ server.registerQuery("store C2 into '"+PARTITIONED_TABLE+"' using org.apache.hcatalog.pig.HCatStorer('bkt=1');");
+
+ server.registerQuery("D = load '"+fullFileNameComplex+"' as (name:chararray, studentid:int, contact:tuple(phno:chararray,email:chararray), currently_registered_courses:bag{innertup:tuple(course:chararray)}, current_grades:map[ ] , phnos :bag{innertup:tuple(phno:chararray,type:chararray)});");
+ server.registerQuery("store D into '"+COMPLEX_TABLE+"' using org.apache.hcatalog.pig.HCatStorer();");
+ server.executeBatch();
+
+ }
+ private void cleanup() throws IOException {
+ MiniCluster.deleteFile(cluster, basicFile);
+ MiniCluster.deleteFile(cluster, complexFile);
+ dropTable(BASIC_TABLE);
+ dropTable(COMPLEX_TABLE);
+ dropTable(PARTITIONED_TABLE);
+ }
+
+ protected void guardedTearDownAfterClass() throws Exception {
+ guardTestCount--;
+ if (guardTestCount > 0){
+ return;
+ }
+ cleanup();
+ }
+
+ @Override
+ protected void setUp() throws Exception {
+ guardedSetUpBeforeClass();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ guardedTearDownAfterClass();
+ }
+
+ public void testSchemaLoadBasic() throws IOException{
+
+ PigServer server = new PigServer(ExecType.LOCAL, props);
+
+ // test that schema was loaded correctly
+ server.registerQuery("X = load '"+BASIC_TABLE+"' using org.apache.hcatalog.pig.HCatLoader();");
+ Schema dumpedXSchema = server.dumpSchema("X");
+ List<FieldSchema> Xfields = dumpedXSchema.getFields();
+ assertEquals(2,Xfields.size());
+ assertTrue(Xfields.get(0).alias.equalsIgnoreCase("a"));
+ assertTrue(Xfields.get(0).type == DataType.INTEGER);
+ assertTrue(Xfields.get(1).alias.equalsIgnoreCase("b"));
+ assertTrue(Xfields.get(1).type == DataType.CHARARRAY);
+
+ }
+
+ public void testReadDataBasic() throws IOException {
+ PigServer server = new PigServer(ExecType.LOCAL, props);
+
+ server.registerQuery("X = load '"+BASIC_TABLE+"' using org.apache.hcatalog.pig.HCatLoader();");
+ Iterator<Tuple> XIter = server.openIterator("X");
+ int numTuplesRead = 0;
+ while( XIter.hasNext() ){
+ Tuple t = XIter.next();
+ assertEquals(2,t.size());
+ assertTrue(t.get(0).getClass() == Integer.class);
+ assertTrue(t.get(1).getClass() == String.class);
+ assertEquals(t.get(0),basicInputData.get(numTuplesRead).first);
+ assertEquals(t.get(1),basicInputData.get(numTuplesRead).second);
+ numTuplesRead++;
+ }
+ assertEquals(basicInputData.size(),numTuplesRead);
+ }
+
+ public void testSchemaLoadComplex() throws IOException{
+
+ PigServer server = new PigServer(ExecType.LOCAL, props);
+
+ // test that schema was loaded correctly
+ server.registerQuery("K = load '"+COMPLEX_TABLE+"' using org.apache.hcatalog.pig.HCatLoader();");
+ Schema dumpedKSchema = server.dumpSchema("K");
+ List<FieldSchema> Kfields = dumpedKSchema.getFields();
+ assertEquals(6,Kfields.size());
+
+ assertEquals(DataType.CHARARRAY,Kfields.get(0).type);
+ assertEquals("name",Kfields.get(0).alias.toLowerCase());
+
+ assertEquals( DataType.INTEGER,Kfields.get(1).type);
+ assertEquals("studentid",Kfields.get(1).alias.toLowerCase());
+
+ assertEquals(DataType.TUPLE,Kfields.get(2).type);
+ assertEquals("contact",Kfields.get(2).alias.toLowerCase());
+ {
+ assertNotNull(Kfields.get(2).schema);
+ assertTrue(Kfields.get(2).schema.getFields().size() == 2);
+ assertTrue(Kfields.get(2).schema.getFields().get(0).type == DataType.CHARARRAY);
+ assertTrue(Kfields.get(2).schema.getFields().get(0).alias.equalsIgnoreCase("phno"));
+ assertTrue(Kfields.get(2).schema.getFields().get(1).type == DataType.CHARARRAY);
+ assertTrue(Kfields.get(2).schema.getFields().get(1).alias.equalsIgnoreCase("email"));
+ }
+ assertEquals(DataType.BAG,Kfields.get(3).type);
+ assertEquals("currently_registered_courses",Kfields.get(3).alias.toLowerCase());
+ {
+ assertNotNull(Kfields.get(3).schema);
+ assertEquals(1,Kfields.get(3).schema.getFields().size());
+ assertEquals(DataType.TUPLE,Kfields.get(3).schema.getFields().get(0).type);
+ assertNotNull(Kfields.get(3).schema.getFields().get(0).schema);
+ assertEquals(1,Kfields.get(3).schema.getFields().get(0).schema.getFields().size());
+ assertEquals(DataType.CHARARRAY,Kfields.get(3).schema.getFields().get(0).schema.getFields().get(0).type);
+ // assertEquals("course",Kfields.get(3).schema.getFields().get(0).schema.getFields().get(0).alias.toLowerCase());
+ // commented out, because the name becomes "innerfield" by default - we call it "course" in pig,
+ // but in the metadata, it'd be anonymous, so this would be autogenerated, which is fine
+ }
+ assertEquals(DataType.MAP,Kfields.get(4).type);
+ assertEquals("current_grades",Kfields.get(4).alias.toLowerCase());
+ assertEquals(DataType.BAG,Kfields.get(5).type);
+ assertEquals("phnos",Kfields.get(5).alias.toLowerCase());
+ {
+ assertNotNull(Kfields.get(5).schema);
+ assertEquals(1,Kfields.get(5).schema.getFields().size());
+ assertEquals(DataType.TUPLE,Kfields.get(5).schema.getFields().get(0).type);
+ assertNotNull(Kfields.get(5).schema.getFields().get(0).schema);
+ assertTrue(Kfields.get(5).schema.getFields().get(0).schema.getFields().size() == 2);
+ assertEquals(DataType.CHARARRAY,Kfields.get(5).schema.getFields().get(0).schema.getFields().get(0).type);
+ assertEquals("phno",Kfields.get(5).schema.getFields().get(0).schema.getFields().get(0).alias.toLowerCase());
+ assertEquals(DataType.CHARARRAY,Kfields.get(5).schema.getFields().get(0).schema.getFields().get(1).type);
+ assertEquals("type",Kfields.get(5).schema.getFields().get(0).schema.getFields().get(1).alias.toLowerCase());
+ }
+
+ }
+
+ public void testReadPartitionedBasic() throws IOException {
+ PigServer server = new PigServer(ExecType.LOCAL, props);
+
+ driver.run("select * from "+PARTITIONED_TABLE);
+ ArrayList<String> valuesReadFromHiveDriver = new ArrayList<String>();
+ driver.getResults(valuesReadFromHiveDriver);
+ assertEquals(basicInputData.size(),valuesReadFromHiveDriver.size());
+
+ server.registerQuery("W = load '"+PARTITIONED_TABLE+"' using org.apache.hcatalog.pig.HCatLoader();");
+ Schema dumpedWSchema = server.dumpSchema("W");
+ List<FieldSchema> Wfields = dumpedWSchema.getFields();
+ assertEquals(3,Wfields.size());
+ assertTrue(Wfields.get(0).alias.equalsIgnoreCase("a"));
+ assertTrue(Wfields.get(0).type == DataType.INTEGER);
+ assertTrue(Wfields.get(1).alias.equalsIgnoreCase("b"));
+ assertTrue(Wfields.get(1).type == DataType.CHARARRAY);
+ assertTrue(Wfields.get(2).alias.equalsIgnoreCase("bkt"));
+ assertTrue(Wfields.get(2).type == DataType.CHARARRAY);
+
+ Iterator<Tuple> WIter = server.openIterator("W");
+ Collection<Pair<Integer,String>> valuesRead = new ArrayList<Pair<Integer,String>>();
+ while( WIter.hasNext() ){
+ Tuple t = WIter.next();
+ assertTrue(t.size() == 3);
+ assertTrue(t.get(0).getClass() == Integer.class);
+ assertTrue(t.get(1).getClass() == String.class);
+ assertTrue(t.get(2).getClass() == String.class);
+ valuesRead.add(new Pair<Integer,String>((Integer)t.get(0),(String)t.get(1)));
+ if ((Integer)t.get(0) < 2){
+ assertEquals("0",t.get(2));
+ }else{
+ assertEquals("1",t.get(2));
+ }
+ }
+ assertEquals(valuesReadFromHiveDriver.size(),valuesRead.size());
+
+ server.registerQuery("P1 = load '"+PARTITIONED_TABLE+"' using org.apache.hcatalog.pig.HCatLoader();");
+ server.registerQuery("P1filter = filter P1 by bkt == '0';");
+ Iterator<Tuple> P1Iter = server.openIterator("P1filter");
+ int count1 = 0;
+ while( P1Iter.hasNext() ) {
+ Tuple t = P1Iter.next();
+
+ assertEquals("0", t.get(2));
+ assertEquals(1, t.get(0));
+ count1++;
+ }
+ assertEquals(3, count1);
+
+ server.registerQuery("P2 = load '"+PARTITIONED_TABLE+"' using org.apache.hcatalog.pig.HCatLoader();");
+ server.registerQuery("P2filter = filter P2 by bkt == '1';");
+ Iterator<Tuple> P2Iter = server.openIterator("P2filter");
+ int count2 = 0;
+ while( P2Iter.hasNext() ) {
+ Tuple t = P2Iter.next();
+
+ assertEquals("1", t.get(2));
+ assertTrue(((Integer) t.get(0)) > 1);
+ count2++;
+ }
+ assertEquals(6, count2);
+ }
+
+ public void testProjectionsBasic() throws IOException {
+
+ PigServer server = new PigServer(ExecType.LOCAL, props);
+
+ // projections are handled by using generate, not "as" on the Load
+
+ server.registerQuery("Y1 = load '"+BASIC_TABLE+"' using org.apache.hcatalog.pig.HCatLoader();");
+ server.registerQuery("Y2 = foreach Y1 generate a;");
+ server.registerQuery("Y3 = foreach Y1 generate b,a;");
+ Schema dumpedY2Schema = server.dumpSchema("Y2");
+ Schema dumpedY3Schema = server.dumpSchema("Y3");
+ List<FieldSchema> Y2fields = dumpedY2Schema.getFields();
+ List<FieldSchema> Y3fields = dumpedY3Schema.getFields();
+ assertEquals(1,Y2fields.size());
+ assertEquals("a",Y2fields.get(0).alias.toLowerCase());
+ assertEquals(DataType.INTEGER,Y2fields.get(0).type);
+ assertEquals(2,Y3fields.size());
+ assertEquals("b",Y3fields.get(0).alias.toLowerCase());
+ assertEquals(DataType.CHARARRAY,Y3fields.get(0).type);
+ assertEquals("a",Y3fields.get(1).alias.toLowerCase());
+ assertEquals(DataType.INTEGER,Y3fields.get(1).type);
+
+ int numTuplesRead = 0;
+ Iterator<Tuple> Y2Iter = server.openIterator("Y2");
+ while( Y2Iter.hasNext() ){
+ Tuple t = Y2Iter.next();
+ assertEquals(t.size(),1);
+ assertTrue(t.get(0).getClass() == Integer.class);
+ assertEquals(t.get(0),basicInputData.get(numTuplesRead).first);
+ numTuplesRead++;
+ }
+ numTuplesRead = 0;
+ Iterator<Tuple> Y3Iter = server.openIterator("Y3");
+ while( Y3Iter.hasNext() ){
+ Tuple t = Y3Iter.next();
+ assertEquals(t.size(),2);
+ assertTrue(t.get(0).getClass() == String.class);
+ assertEquals(t.get(0),basicInputData.get(numTuplesRead).second);
+ assertTrue(t.get(1).getClass() == Integer.class);
+ assertEquals(t.get(1),basicInputData.get(numTuplesRead).first);
+ numTuplesRead++;
+ }
+ assertEquals(basicInputData.size(),numTuplesRead);
+ }
+}