You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by ga...@apache.org on 2011/04/12 17:30:12 UTC
svn commit: r1091509 [8/8] - in /incubator/hcatalog/trunk: ./ bin/ ivy/ src/
src/docs/ src/docs/src/ src/docs/src/documentation/
src/docs/src/documentation/classes/ src/docs/src/documentation/conf/
src/docs/src/documentation/content/ src/docs/src/docum...
Added: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatStorer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatStorer.java?rev=1091509&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatStorer.java (added)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatStorer.java Tue Apr 12 17:30:08 2011
@@ -0,0 +1,587 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.pig;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Properties;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hcatalog.MiniCluster;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.rcfile.RCFileInputDriver;
+import org.apache.hcatalog.rcfile.RCFileOutputDriver;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigException;
+import org.apache.pig.PigServer;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.util.LogUtils;
+import org.apache.pig.impl.util.UDFContext;
+
+public class TestHCatStorer extends TestCase {
+
+ MiniCluster cluster = MiniCluster.buildCluster();
+ private Driver driver;
+ Properties props;
+
+ @Override
+ protected void setUp() throws Exception {
+
+ HiveConf hiveConf = new HiveConf(this.getClass());
+ hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+ hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+ hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+ driver = new Driver(hiveConf);
+ SessionState.start(new CliSessionState(hiveConf));
+ props = new Properties();
+ props.setProperty("fs.default.name", cluster.getProperties().getProperty("fs.default.name"));
+ fullFileName = cluster.getProperties().getProperty("fs.default.name") + fileName;
+ }
+
+ String fileName = "/tmp/input.data";
+ String fullFileName;
+
+
+// public void testStoreFuncMap() throws IOException{
+//
+// driver.run("drop table junit_unparted");
+// String createTable = "create table junit_unparted(b string,arr_of_maps array<map<string,string>>) stored as RCFILE " +
+// "tblproperties('hcat.isd'='org.apache.hadoop.hive.hCatalog.rcfile.RCFileInputStorageDriver'," +
+// "'hcat.osd'='org.apache.hadoop.hive.hCatalog.rcfile.RCFileOutputStorageDriver') ";
+// int retCode = driver.run(createTable).getResponseCode();
+// if(retCode != 0) {
+// throw new IOException("Failed to create table.");
+// }
+//
+// MiniCluster.deleteFile(cluster, fileName);
+// MiniCluster.createInputFile(cluster, fileName, new String[]{"test\t{([a#haddop,b#pig])}","data\t{([b#hive,a#howl])}"});
+//
+// PigServer server = new PigServer(ExecType.LOCAL, props);
+// UDFContext.getUDFContext().setClientSystemProps();
+// server.setBatchOn();
+// server.registerQuery("A = load '"+ fullFileName +"' as (b:chararray,arr_of_maps:bag{mytup:tuple ( mymap:map[ ])});");
+// server.registerQuery("store A into 'default.junit_unparted' using org.apache.hadoop.hive.hCatalog.pig.HCatStorer('','b:chararray,arr_of_maps:bag{mytup:tuple ( mymap:map[ ])}');");
+// server.executeBatch();
+//
+//
+//
+// MiniCluster.deleteFile(cluster, fileName);
+//
+// driver.run("select * from junit_unparted");
+// ArrayList<String> res = new ArrayList<String>();
+// driver.getResults(res);
+// driver.run("drop table junit_unparted");
+// Iterator<String> itr = res.iterator();
+// System.out.println(itr.next());
+// System.out.println(itr.next());
+// assertFalse(itr.hasNext());
+//
+// }
+
+ public void testPartColsInData() throws IOException{
+
+ driver.run("drop table junit_unparted");
+ String createTable = "create table junit_unparted(a int) partitioned by (b string) stored as RCFILE " +
+ "tblproperties('"+HCatConstants.HCAT_ISD_CLASS+"'='"+RCFileInputDriver.class.getName()+"'," +
+ "'"+HCatConstants.HCAT_OSD_CLASS+"'='"+RCFileOutputDriver.class.getName()+"') ";
+ int retCode = driver.run(createTable).getResponseCode();
+ if(retCode != 0) {
+ throw new IOException("Failed to create table.");
+ }
+ MiniCluster.deleteFile(cluster, fileName);
+ int LOOP_SIZE = 11;
+ String[] input = new String[LOOP_SIZE];
+ for(int i = 0; i < LOOP_SIZE; i++) {
+ input[i] = i + "\t1";
+ }
+ MiniCluster.createInputFile(cluster, fileName, input);
+ PigServer server = new PigServer(ExecType.LOCAL, props);
+ UDFContext.getUDFContext().setClientSystemProps();
+ server.registerQuery("A = load '"+fullFileName+"' as (a:int, b:chararray);");
+ server.registerQuery("store A into 'default.junit_unparted' using "+HCatStorer.class.getName()+"('b=1');");
+ server.registerQuery("B = load 'default.junit_unparted' using "+HCatLoader.class.getName()+"();");
+ Iterator<Tuple> itr= server.openIterator("B");
+
+ int i = 0;
+
+ while(itr.hasNext()){
+ Tuple t = itr.next();
+ assertEquals(2, t.size());
+ assertEquals(t.get(0), i);
+ assertEquals(t.get(1), "1");
+ i++;
+ }
+
+ assertFalse(itr.hasNext());
+ assertEquals(11, i);
+ MiniCluster.deleteFile(cluster, fileName);
+ }
+
+ public void testMultiPartColsInData() throws IOException{
+
+ driver.run("drop table employee");
+ String createTable = "CREATE TABLE employee (emp_id INT, emp_name STRING, emp_start_date STRING , emp_gender STRING ) " +
+ " PARTITIONED BY (emp_country STRING , emp_state STRING ) STORED AS RCFILE " +
+ "tblproperties('"+HCatConstants.HCAT_ISD_CLASS+"'='"+RCFileInputDriver.class.getName()+"'," +
+ "'"+HCatConstants.HCAT_OSD_CLASS+"'='"+RCFileOutputDriver.class.getName()+"') ";
+
+ int retCode = driver.run(createTable).getResponseCode();
+ if(retCode != 0) {
+ throw new IOException("Failed to create table.");
+ }
+
+ MiniCluster.deleteFile(cluster, fullFileName);
+ String[] inputData = {"111237\tKrishna\t01/01/1990\tM\tIN\tTN",
+ "111238\tKalpana\t01/01/2000\tF\tIN\tKA",
+ "111239\tSatya\t01/01/2001\tM\tIN\tKL",
+ "111240\tKavya\t01/01/2002\tF\tIN\tAP"};
+
+ MiniCluster.createInputFile(cluster, fullFileName, inputData);
+ PigServer pig = new PigServer(ExecType.LOCAL, props);
+ UDFContext.getUDFContext().setClientSystemProps();
+ pig.setBatchOn();
+ pig.registerQuery("A = LOAD '"+fullFileName+"' USING PigStorage() AS (emp_id:int,emp_name:chararray,emp_start_date:chararray," +
+ "emp_gender:chararray,emp_country:chararray,emp_state:chararray);");
+ pig.registerQuery("TN = FILTER A BY emp_state == 'TN';");
+ pig.registerQuery("KA = FILTER A BY emp_state == 'KA';");
+ pig.registerQuery("KL = FILTER A BY emp_state == 'KL';");
+ pig.registerQuery("AP = FILTER A BY emp_state == 'AP';");
+ pig.registerQuery("STORE TN INTO 'employee' USING "+HCatStorer.class.getName()+"('emp_country=IN,emp_state=TN');");
+ pig.registerQuery("STORE KA INTO 'employee' USING "+HCatStorer.class.getName()+"('emp_country=IN,emp_state=KA');");
+ pig.registerQuery("STORE KL INTO 'employee' USING "+HCatStorer.class.getName()+"('emp_country=IN,emp_state=KL');");
+ pig.registerQuery("STORE AP INTO 'employee' USING "+HCatStorer.class.getName()+"('emp_country=IN,emp_state=AP');");
+ pig.executeBatch();
+ driver.run("select * from employee");
+ ArrayList<String> results = new ArrayList<String>();
+ driver.getResults(results);
+ assertEquals(4, results.size());
+ Collections.sort(results);
+ assertEquals(inputData[0], results.get(0));
+ assertEquals(inputData[1], results.get(1));
+ assertEquals(inputData[2], results.get(2));
+ assertEquals(inputData[3], results.get(3));
+ MiniCluster.deleteFile(cluster, fullFileName);
+ driver.run("drop table employee");
+ }
+
+ public void testStoreInPartiitonedTbl() throws IOException{
+
+ driver.run("drop table junit_unparted");
+ String createTable = "create table junit_unparted(a int) partitioned by (b string) stored as RCFILE " +
+ "tblproperties('hcat.isd'='"+RCFileInputDriver.class.getName()+"'," +
+ "'"+HCatConstants.HCAT_OSD_CLASS+"'='org.apache.hcatalog.rcfile.RCFileOutputDriver') ";
+ int retCode = driver.run(createTable).getResponseCode();
+ if(retCode != 0) {
+ throw new IOException("Failed to create table.");
+ }
+ MiniCluster.deleteFile(cluster, fileName);
+ int LOOP_SIZE = 11;
+ String[] input = new String[LOOP_SIZE];
+ for(int i = 0; i < LOOP_SIZE; i++) {
+ input[i] = i+"";
+ }
+ MiniCluster.createInputFile(cluster, fileName, input);
+ PigServer server = new PigServer(ExecType.LOCAL, props);
+ UDFContext.getUDFContext().setClientSystemProps();
+ server.registerQuery("A = load '"+fullFileName+"' as (a:int);");
+ server.registerQuery("store A into 'default.junit_unparted' using "+HCatStorer.class.getName()+"('b=1');");
+ server.registerQuery("B = load 'default.junit_unparted' using "+HCatLoader.class.getName()+"();");
+ Iterator<Tuple> itr= server.openIterator("B");
+
+ int i = 0;
+
+ while(itr.hasNext()){
+ Tuple t = itr.next();
+ assertEquals(2, t.size());
+ assertEquals(t.get(0), i);
+ assertEquals(t.get(1), "1");
+ i++;
+ }
+
+ assertFalse(itr.hasNext());
+ assertEquals(11, i);
+ MiniCluster.deleteFile(cluster, fileName);
+ }
+
+ public void testNoAlias() throws IOException{
+ driver.run("drop table junit_parted");
+ String createTable = "create table junit_parted(a int, b string) partitioned by (ds string) stored as RCFILE " +
+ "tblproperties('"+HCatConstants.HCAT_ISD_CLASS+"'='"+RCFileInputDriver.class.getName()+"'," +
+ "'"+HCatConstants.HCAT_OSD_CLASS+"'='"+RCFileOutputDriver.class.getName()+"') ";
+ int retCode = driver.run(createTable).getResponseCode();
+ if(retCode != 0) {
+ throw new IOException("Failed to create table.");
+ }
+ PigServer server = new PigServer(ExecType.LOCAL, props);
+ UDFContext.getUDFContext().setClientSystemProps();
+ boolean errCaught = false;
+ try{
+ server.setBatchOn();
+ server.registerQuery("A = load '"+ fullFileName +"' as (a:int, b:chararray);");
+ server.registerQuery("B = foreach A generate a+10, b;");
+ server.registerQuery("store B into 'junit_parted' using "+HCatStorer.class.getName()+"('ds=20100101');");
+ server.executeBatch();
+ }
+ catch(PigException fe){
+ PigException pe = LogUtils.getPigException(fe);
+ assertTrue(pe instanceof FrontendException);
+ assertEquals(PigHCatUtil.PIG_EXCEPTION_CODE, pe.getErrorCode());
+ assertTrue(pe.getMessage().contains("Column name for a field is not specified. Please provide the full schema as an argument to HCatStorer."));
+ errCaught = true;
+ }
+ assertTrue(errCaught);
+ errCaught = false;
+ try{
+ server.setBatchOn();
+ server.registerQuery("A = load '"+ fullFileName +"' as (a:int, B:chararray);");
+ server.registerQuery("B = foreach A generate a, B;");
+ server.registerQuery("store B into 'junit_parted' using "+HCatStorer.class.getName()+"('ds=20100101');");
+ server.executeBatch();
+ }
+ catch(PigException fe){
+ PigException pe = LogUtils.getPigException(fe);
+ assertTrue(pe instanceof FrontendException);
+ assertEquals(PigHCatUtil.PIG_EXCEPTION_CODE, pe.getErrorCode());
+ assertTrue(pe.getMessage().contains("Column names should all be in lowercase. Invalid name found: B"));
+ errCaught = true;
+ }
+ driver.run("drop table junit_parted");
+ assertTrue(errCaught);
+ }
+
+ public void testStoreMultiTables() throws IOException{
+
+ driver.run("drop table junit_unparted");
+ String createTable = "create table junit_unparted(a int, b string) stored as RCFILE " +
+ "tblproperties('"+HCatConstants.HCAT_ISD_CLASS+"'='"+RCFileInputDriver.class.getName()+"'," +
+ "'hcat.osd'='org.apache.hcatalog.rcfile.RCFileOutputDriver') ";
+ int retCode = driver.run(createTable).getResponseCode();
+ if(retCode != 0) {
+ throw new IOException("Failed to create table.");
+ }
+ driver.run("drop table junit_unparted2");
+ createTable = "create table junit_unparted2(a int, b string) stored as RCFILE " +
+ "tblproperties('"+HCatConstants.HCAT_ISD_CLASS+"'='"+RCFileInputDriver.class.getName()+"'," +
+ "'"+HCatConstants.HCAT_OSD_CLASS+"'='"+RCFileOutputDriver.class.getName()+"') ";
+ retCode = driver.run(createTable).getResponseCode();
+ if(retCode != 0) {
+ throw new IOException("Failed to create table.");
+ }
+
+ MiniCluster.deleteFile(cluster, fileName);
+ int LOOP_SIZE = 3;
+ String[] input = new String[LOOP_SIZE*LOOP_SIZE];
+ int k = 0;
+ for(int i = 1; i <= LOOP_SIZE; i++) {
+ String si = i + "";
+ for(int j=1;j<=LOOP_SIZE;j++) {
+ input[k++] = si + "\t"+j;
+ }
+ }
+ MiniCluster.createInputFile(cluster, fileName, input);
+ PigServer server = new PigServer(ExecType.LOCAL, props);
+ UDFContext.getUDFContext().setClientSystemProps();
+ server.setBatchOn();
+ server.registerQuery("A = load '"+ fullFileName +"' as (a:int, b:chararray);");
+ server.registerQuery("B = filter A by a < 2;");
+ server.registerQuery("store B into 'junit_unparted' using "+HCatStorer.class.getName()+"();");
+ server.registerQuery("C = filter A by a >= 2;");
+ server.registerQuery("store C into 'junit_unparted2' using "+HCatStorer.class.getName()+"();");
+ server.executeBatch();
+ MiniCluster.deleteFile(cluster, fileName);
+
+ driver.run("select * from junit_unparted");
+ ArrayList<String> res = new ArrayList<String>();
+ driver.getResults(res);
+ driver.run("select * from junit_unparted2");
+ ArrayList<String> res2 = new ArrayList<String>();
+ driver.getResults(res2);
+
+ res.addAll(res2);
+ driver.run("drop table junit_unparted");
+ driver.run("drop table junit_unparted2");
+
+ Iterator<String> itr = res.iterator();
+ for(int i = 0; i < LOOP_SIZE*LOOP_SIZE; i++) {
+ assertEquals( input[i] ,itr.next());
+ }
+
+ assertFalse(itr.hasNext());
+
+ }
+
+ public void testStoreWithNoSchema() throws IOException{
+
+ driver.run("drop table junit_unparted");
+ String createTable = "create table junit_unparted(a int, b string) stored as RCFILE " +
+ "tblproperties('"+HCatConstants.HCAT_ISD_CLASS+"'='"+RCFileInputDriver.class.getName()+"'," +
+ "'"+HCatConstants.HCAT_OSD_CLASS+"'='"+RCFileOutputDriver.class.getName()+"') ";
+ int retCode = driver.run(createTable).getResponseCode();
+ if(retCode != 0) {
+ throw new IOException("Failed to create table.");
+ }
+ MiniCluster.deleteFile(cluster, fileName);
+ int LOOP_SIZE = 3;
+ String[] input = new String[LOOP_SIZE*LOOP_SIZE];
+ int k = 0;
+ for(int i = 1; i <= LOOP_SIZE; i++) {
+ String si = i + "";
+ for(int j=1;j<=LOOP_SIZE;j++) {
+ input[k++] = si + "\t"+j;
+ }
+ }
+ MiniCluster.createInputFile(cluster, fileName, input);
+ PigServer server = new PigServer(ExecType.LOCAL, props);
+ UDFContext.getUDFContext().setClientSystemProps();
+ server.setBatchOn();
+ server.registerQuery("A = load '"+ fullFileName +"' as (a:int, b:chararray);");
+ server.registerQuery("store A into 'default.junit_unparted' using "+HCatStorer.class.getName()+"('');");
+ server.executeBatch();
+ MiniCluster.deleteFile(cluster, fileName);
+
+ driver.run("select * from junit_unparted");
+ ArrayList<String> res = new ArrayList<String>();
+ driver.getResults(res);
+ driver.run("drop table junit_unparted");
+ Iterator<String> itr = res.iterator();
+ for(int i = 0; i < LOOP_SIZE*LOOP_SIZE; i++) {
+ assertEquals( input[i] ,itr.next());
+ }
+
+ assertFalse(itr.hasNext());
+
+ }
+
+ public void testStoreWithNoCtorArgs() throws IOException{
+
+ driver.run("drop table junit_unparted");
+ String createTable = "create table junit_unparted(a int, b string) stored as RCFILE " +
+ "tblproperties('"+HCatConstants.HCAT_ISD_CLASS+"'='"+RCFileInputDriver.class.getName()+"'," +
+ "'"+HCatConstants.HCAT_OSD_CLASS+"'='"+RCFileOutputDriver.class.getName()+"') ";
+ int retCode = driver.run(createTable).getResponseCode();
+ if(retCode != 0) {
+ throw new IOException("Failed to create table.");
+ }
+ MiniCluster.deleteFile(cluster, fileName);
+ int LOOP_SIZE = 3;
+ String[] input = new String[LOOP_SIZE*LOOP_SIZE];
+ int k = 0;
+ for(int i = 1; i <= LOOP_SIZE; i++) {
+ String si = i + "";
+ for(int j=1;j<=LOOP_SIZE;j++) {
+ input[k++] = si + "\t"+j;
+ }
+ }
+ MiniCluster.createInputFile(cluster, fileName, input);
+ PigServer server = new PigServer(ExecType.LOCAL, props);
+ UDFContext.getUDFContext().setClientSystemProps();
+ server.setBatchOn();
+ server.registerQuery("A = load '"+ fullFileName +"' as (a:int, b:chararray);");
+ server.registerQuery("store A into 'junit_unparted' using "+HCatStorer.class.getName()+"();");
+ server.executeBatch();
+ MiniCluster.deleteFile(cluster, fileName);
+
+ driver.run("select * from junit_unparted");
+ ArrayList<String> res = new ArrayList<String>();
+ driver.getResults(res);
+ driver.run("drop table junit_unparted");
+ Iterator<String> itr = res.iterator();
+ for(int i = 0; i < LOOP_SIZE*LOOP_SIZE; i++) {
+ assertEquals( input[i] ,itr.next());
+ }
+
+ assertFalse(itr.hasNext());
+
+ }
+
+ public void testEmptyStore() throws IOException{
+
+ driver.run("drop table junit_unparted");
+ String createTable = "create table junit_unparted(a int, b string) stored as RCFILE " +
+ "tblproperties('"+HCatConstants.HCAT_ISD_CLASS+"'='"+RCFileInputDriver.class.getName()+"'," +
+ "'"+HCatConstants.HCAT_OSD_CLASS+"'='"+RCFileOutputDriver.class.getName()+"') ";
+ int retCode = driver.run(createTable).getResponseCode();
+ if(retCode != 0) {
+ throw new IOException("Failed to create table.");
+ }
+ MiniCluster.deleteFile(cluster, fileName);
+ int LOOP_SIZE = 3;
+ String[] input = new String[LOOP_SIZE*LOOP_SIZE];
+ int k = 0;
+ for(int i = 1; i <= LOOP_SIZE; i++) {
+ String si = i + "";
+ for(int j=1;j<=LOOP_SIZE;j++) {
+ input[k++] = si + "\t"+j;
+ }
+ }
+ MiniCluster.createInputFile(cluster, fileName, input);
+ PigServer server = new PigServer(ExecType.LOCAL, props);
+ UDFContext.getUDFContext().setClientSystemProps();
+ server.setBatchOn();
+ server.registerQuery("A = load '"+fullFileName+"' as (a:int, b:chararray);");
+ server.registerQuery("B = filter A by a > 100;");
+ server.registerQuery("store B into 'default.junit_unparted' using "+HCatStorer.class.getName()+"('','a:int,b:chararray');");
+ server.executeBatch();
+ MiniCluster.deleteFile(cluster, fileName);
+
+ driver.run("select * from junit_unparted");
+ ArrayList<String> res = new ArrayList<String>();
+ driver.getResults(res);
+ driver.run("drop table junit_unparted");
+ Iterator<String> itr = res.iterator();
+ assertFalse(itr.hasNext());
+
+ }
+
+ public void testBagNStruct() throws IOException{
+ driver.run("drop table junit_unparted");
+ String createTable = "create table junit_unparted(b string,a struct<a1:int>, arr_of_struct array<string>, " +
+ "arr_of_struct2 array<struct<s1:string,s2:string>>, arr_of_struct3 array<struct<s3:string>>) stored as RCFILE " +
+ "tblproperties('"+HCatConstants.HCAT_ISD_CLASS+"'='"+RCFileInputDriver.class.getName()+"'," +
+ "'"+HCatConstants.HCAT_OSD_CLASS+"'='"+RCFileOutputDriver.class.getName()+"') ";
+ int retCode = driver.run(createTable).getResponseCode();
+ if(retCode != 0) {
+ throw new IOException("Failed to create table.");
+ }
+
+ MiniCluster.deleteFile(cluster, fileName);
+ MiniCluster.createInputFile(cluster, fileName, new String[]{"zookeeper\t(2)\t{(pig)}\t{(pnuts,hdfs)}\t{(hadoop),(howl)}",
+ "chubby\t(2)\t{(sawzall)}\t{(bigtable,gfs)}\t{(mapreduce),(howl)}"});
+
+ PigServer server = new PigServer(ExecType.LOCAL, props);
+ UDFContext.getUDFContext().setClientSystemProps();
+ server.setBatchOn();
+ server.registerQuery("A = load '"+fullFileName+"' as (b:chararray, a:tuple(a1:int), arr_of_struct:bag{mytup:tuple(s1:chararray)}, arr_of_struct2:bag{mytup:tuple(s1:chararray,s2:chararray)}, arr_of_struct3:bag{t3:tuple(s3:chararray)});");
+ server.registerQuery("store A into 'default.junit_unparted' using "+HCatStorer.class.getName()+"('','b:chararray, a:tuple(a1:int)," +
+ " arr_of_struct:bag{mytup:tuple(s1:chararray)}, arr_of_struct2:bag{mytup:tuple(s1:chararray,s2:chararray)}, arr_of_struct3:bag{t3:tuple(s3:chararray)}');");
+ server.executeBatch();
+
+
+
+ MiniCluster.deleteFile(cluster, fileName);
+
+ driver.run("select * from junit_unparted");
+ ArrayList<String> res = new ArrayList<String>();
+ driver.getResults(res);
+ driver.run("drop table junit_unparted");
+ Iterator<String> itr = res.iterator();
+ assertEquals("zookeeper\t{\"a1\":2}\t[\"pig\"]\t[{\"s1\":\"pnuts\",\"s2\":\"hdfs\"}]\t[{\"s3\":\"hadoop\"},{\"s3\":\"howl\"}]", itr.next());
+ assertEquals("chubby\t{\"a1\":2}\t[\"sawzall\"]\t[{\"s1\":\"bigtable\",\"s2\":\"gfs\"}]\t[{\"s3\":\"mapreduce\"},{\"s3\":\"howl\"}]",itr.next());
+ assertFalse(itr.hasNext());
+
+ }
+
+ public void testStoreFuncAllSimpleTypes() throws IOException{
+
+ driver.run("drop table junit_unparted");
+ String createTable = "create table junit_unparted(a int, b float, c double, d bigint, e string) stored as RCFILE " +
+ "tblproperties('"+HCatConstants.HCAT_ISD_CLASS+"'='"+RCFileInputDriver.class.getName()+"'," +
+ "'"+HCatConstants.HCAT_OSD_CLASS+"'='"+RCFileOutputDriver.class.getName()+"') ";
+ int retCode = driver.run(createTable).getResponseCode();
+ if(retCode != 0) {
+ throw new IOException("Failed to create table.");
+ }
+ MiniCluster.deleteFile(cluster, fileName);
+ int LOOP_SIZE = 3;
+ String[] input = new String[LOOP_SIZE*LOOP_SIZE];
+ for(int i = 0; i < LOOP_SIZE*LOOP_SIZE; i++) {
+ input[i] = i + "\t" + i * 2.1f +"\t"+ i*1.1d + "\t" + i * 2L +"\t"+"lets howl";
+ }
+
+ MiniCluster.createInputFile(cluster, fileName, input);
+ PigServer server = new PigServer(ExecType.LOCAL, props);
+ UDFContext.getUDFContext().setClientSystemProps();
+ server.setBatchOn();
+ server.registerQuery("A = load '"+fullFileName+"' as (a:int, b:float, c:double, d:long, e:chararray);");
+ server.registerQuery("store A into 'default.junit_unparted' using "+HCatStorer.class.getName()+"('','a:int, b:float, c:double, d:long, e:chararray');");
+ server.executeBatch();
+ MiniCluster.deleteFile(cluster, fileName);
+
+ driver.run("select * from junit_unparted");
+ ArrayList<String> res = new ArrayList<String>();
+ driver.getResults(res);
+
+ Iterator<String> itr = res.iterator();
+ for(int i = 0; i < LOOP_SIZE*LOOP_SIZE; i++) {
+ assertEquals( input[i] ,itr.next());
+ }
+
+ assertFalse(itr.hasNext());
+ driver.run("drop table junit_unparted");
+ }
+ @Override
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ MiniCluster.deleteFile(cluster, fileName);
+ }
+
+
+
+
+ public void testStoreFuncSimple() throws IOException{
+
+ driver.run("drop table junit_unparted");
+ String createTable = "create table junit_unparted(a int, b string) stored as RCFILE " +
+ "tblproperties('"+HCatConstants.HCAT_ISD_CLASS+"'='"+RCFileInputDriver.class.getName()+"'," +
+ "'"+HCatConstants.HCAT_OSD_CLASS+"'='"+RCFileOutputDriver.class.getName()+"') ";
+ int retCode = driver.run(createTable).getResponseCode();
+ if(retCode != 0) {
+ throw new IOException("Failed to create table.");
+ }
+ MiniCluster.deleteFile(cluster, fileName);
+ int LOOP_SIZE = 3;
+ String[] input = new String[LOOP_SIZE*LOOP_SIZE];
+ int k = 0;
+ for(int i = 1; i <= LOOP_SIZE; i++) {
+ String si = i + "";
+ for(int j=1;j<=LOOP_SIZE;j++) {
+ input[k++] = si + "\t"+j;
+ }
+ }
+ MiniCluster.createInputFile(cluster, fileName, input);
+ PigServer server = new PigServer(ExecType.LOCAL, props);
+ UDFContext.getUDFContext().setClientSystemProps();
+ server.setBatchOn();
+ server.registerQuery("A = load '"+fullFileName+"' as (a:int, b:chararray);");
+ server.registerQuery("store A into 'default.junit_unparted' using "+HCatStorer.class.getName()+"('','a:int,b:chararray');");
+ server.executeBatch();
+ MiniCluster.deleteFile(cluster, fileName);
+
+ driver.run("select * from junit_unparted");
+ ArrayList<String> res = new ArrayList<String>();
+ driver.getResults(res);
+ driver.run("drop table junit_unparted");
+ Iterator<String> itr = res.iterator();
+ for(int i = 1; i <= LOOP_SIZE; i++) {
+ String si = i + "";
+ for(int j=1;j<=LOOP_SIZE;j++) {
+ assertEquals( si + "\t"+j,itr.next());
+ }
+ }
+ assertFalse(itr.hasNext());
+
+ }
+}
Added: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatStorerMulti.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatStorerMulti.java?rev=1091509&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatStorerMulti.java (added)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatStorerMulti.java Tue Apr 12 17:30:08 2011
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.pig;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hcatalog.MiniCluster;
+import org.apache.hcatalog.data.Pair;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.impl.util.UDFContext;
+
+public class TestHCatStorerMulti extends TestCase {
+
+ private static final String BASIC_TABLE = "junit_unparted_basic";
+ private static final String PARTITIONED_TABLE = "junit_parted_basic";
+ private static MiniCluster cluster = MiniCluster.buildCluster();
+ private static Driver driver;
+
+ private static final String basicFile = "/tmp/basic.input.data";
+ private static String basicFileFullName;
+ private static Properties props;
+
+ private static Map<Integer,Pair<Integer,String>> basicInputData;
+
+ private void dropTable(String tablename) throws IOException{
+ driver.run("drop table "+tablename);
+ }
+ private void createTable(String tablename, String schema, String partitionedBy) throws IOException{
+ String createTable;
+ createTable = "create table "+tablename+"("+schema+") ";
+ if ((partitionedBy != null)&&(!partitionedBy.trim().isEmpty())){
+ createTable = createTable + "partitioned by ("+partitionedBy+") ";
+ }
+ createTable = createTable + "stored as RCFILE tblproperties('hcat.isd'='org.apache.hcatalog.rcfile.RCFileInputDriver'," +
+ "'hcat.osd'='org.apache.hcatalog.rcfile.RCFileOutputDriver') ";
+ int retCode = driver.run(createTable).getResponseCode();
+ if(retCode != 0) {
+ throw new IOException("Failed to create table. ["+createTable+"], return code from hive driver : ["+retCode+"]");
+ }
+ }
+
+ private void createTable(String tablename, String schema) throws IOException{
+ createTable(tablename,schema,null);
+ }
+
+ @Override
+ protected void setUp() throws Exception {
+ if (driver == null){
+ HiveConf hiveConf = new HiveConf(this.getClass());
+ hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+ hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+ hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+ driver = new Driver(hiveConf);
+ SessionState.start(new CliSessionState(hiveConf));
+ }
+
+ props = new Properties();
+ props.setProperty("fs.default.name", cluster.getProperties().getProperty("fs.default.name"));
+ basicFileFullName = cluster.getProperties().getProperty("fs.default.name") + basicFile;
+
+ cleanup();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ cleanup();
+ }
+
+ public void testStoreBasicTable() throws Exception {
+
+
+ createTable(BASIC_TABLE,"a int, b string");
+
+ populateBasicFile();
+
+ PigServer server = new PigServer(ExecType.LOCAL, props);
+ UDFContext.getUDFContext().setClientSystemProps();
+ server.setBatchOn();
+ server.registerQuery("A = load '"+basicFileFullName+"' as (a:int, b:chararray);");
+ server.registerQuery("store A into '"+BASIC_TABLE+"' using org.apache.hcatalog.pig.HCatStorer();");
+
+ server.executeBatch();
+
+ driver.run("select * from "+BASIC_TABLE);
+ ArrayList<String> unpartitionedTableValuesReadFromHiveDriver = new ArrayList<String>();
+ driver.getResults(unpartitionedTableValuesReadFromHiveDriver);
+ assertEquals(basicInputData.size(),unpartitionedTableValuesReadFromHiveDriver.size());
+ }
+
+ public void testStorePartitionedTable() throws Exception {
+ createTable(PARTITIONED_TABLE,"a int, b string","bkt string");
+
+ populateBasicFile();
+
+ PigServer server = new PigServer(ExecType.LOCAL, props);
+ UDFContext.getUDFContext().setClientSystemProps();
+ server.setBatchOn();
+ server.registerQuery("A = load '"+basicFileFullName+"' as (a:int, b:chararray);");
+
+ server.registerQuery("B2 = filter A by a < 2;");
+ server.registerQuery("store B2 into '"+PARTITIONED_TABLE+"' using org.apache.hcatalog.pig.HCatStorer('bkt=0');");
+ server.registerQuery("C2 = filter A by a >= 2;");
+ server.registerQuery("store C2 into '"+PARTITIONED_TABLE+"' using org.apache.hcatalog.pig.HCatStorer('bkt=1');");
+
+ server.executeBatch();
+
+ driver.run("select * from "+PARTITIONED_TABLE);
+ ArrayList<String> partitionedTableValuesReadFromHiveDriver = new ArrayList<String>();
+ driver.getResults(partitionedTableValuesReadFromHiveDriver);
+ assertEquals(basicInputData.size(),partitionedTableValuesReadFromHiveDriver.size());
+ }
+
+ public void testStoreTableMulti() throws Exception {
+
+
+ createTable(BASIC_TABLE,"a int, b string");
+ createTable(PARTITIONED_TABLE,"a int, b string","bkt string");
+
+ populateBasicFile();
+
+ PigServer server = new PigServer(ExecType.LOCAL, props);
+ UDFContext.getUDFContext().setClientSystemProps();
+ server.setBatchOn();
+ server.registerQuery("A = load '"+basicFileFullName+"' as (a:int, b:chararray);");
+ server.registerQuery("store A into '"+BASIC_TABLE+"' using org.apache.hcatalog.pig.HCatStorer();");
+
+ server.registerQuery("B2 = filter A by a < 2;");
+ server.registerQuery("store B2 into '"+PARTITIONED_TABLE+"' using org.apache.hcatalog.pig.HCatStorer('bkt=0');");
+ server.registerQuery("C2 = filter A by a >= 2;");
+ server.registerQuery("store C2 into '"+PARTITIONED_TABLE+"' using org.apache.hcatalog.pig.HCatStorer('bkt=1');");
+
+ server.executeBatch();
+
+ driver.run("select * from "+BASIC_TABLE);
+ ArrayList<String> unpartitionedTableValuesReadFromHiveDriver = new ArrayList<String>();
+ driver.getResults(unpartitionedTableValuesReadFromHiveDriver);
+ driver.run("select * from "+PARTITIONED_TABLE);
+ ArrayList<String> partitionedTableValuesReadFromHiveDriver = new ArrayList<String>();
+ driver.getResults(partitionedTableValuesReadFromHiveDriver);
+ assertEquals(basicInputData.size(),unpartitionedTableValuesReadFromHiveDriver.size());
+ assertEquals(basicInputData.size(),partitionedTableValuesReadFromHiveDriver.size());
+ }
+
+ private void populateBasicFile() throws IOException {
+ int LOOP_SIZE = 3;
+ String[] input = new String[LOOP_SIZE*LOOP_SIZE];
+ basicInputData = new HashMap<Integer,Pair<Integer,String>>();
+ int k = 0;
+ for(int i = 1; i <= LOOP_SIZE; i++) {
+ String si = i + "";
+ for(int j=1;j<=LOOP_SIZE;j++) {
+ String sj = "S"+j+"S";
+ input[k] = si + "\t" + sj;
+ basicInputData.put(k, new Pair<Integer,String>(i,sj));
+ k++;
+ }
+ }
+ MiniCluster.createInputFile(cluster, basicFile, input);
+ }
+
+ private void cleanup() throws IOException {
+ MiniCluster.deleteFile(cluster, basicFile);
+ dropTable(BASIC_TABLE);
+ dropTable(PARTITIONED_TABLE);
+ }
+}
Added: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestPermsInheritance.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestPermsInheritance.java?rev=1091509&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestPermsInheritance.java (added)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestPermsInheritance.java Tue Apr 12 17:30:08 2011
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.pig;
+
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.UnknownTableException;
+import org.apache.hcatalog.ExitException;
+import org.apache.hcatalog.NoExitSecurityManager;
+import org.apache.hcatalog.cli.HCatCli;
+import org.apache.hcatalog.pig.HCatStorer;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.impl.util.UDFContext;
+import org.apache.thrift.TException;
+
+public class TestPermsInheritance extends TestCase {
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ securityManager = System.getSecurityManager();
+ System.setSecurityManager(new NoExitSecurityManager());
+ msc = new HiveMetaStoreClient(conf);
+ msc.dropTable(MetaStoreUtils.DEFAULT_DATABASE_NAME,"testNoPartTbl", true,true);
+ System.setProperty(HiveConf.ConfVars.PREEXECHOOKS.varname, " ");
+ System.setProperty(HiveConf.ConfVars.POSTEXECHOOKS.varname, " ");
+ msc.dropTable(MetaStoreUtils.DEFAULT_DATABASE_NAME,"testPartTbl", true,true);
+ pig = new PigServer(ExecType.LOCAL, conf.getAllProperties());
+ UDFContext.getUDFContext().setClientSystemProps();
+ }
+
+ private HiveMetaStoreClient msc;
+ private SecurityManager securityManager;
+ private PigServer pig;
+
+ @Override
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ System.setSecurityManager(securityManager);
+ }
+
+ private final HiveConf conf = new HiveConf(this.getClass());
+
+ public void testNoPartTbl() throws IOException, MetaException, UnknownTableException, TException, NoSuchObjectException{
+
+ try{
+ HCatCli.main(new String[]{"-e","create table testNoPartTbl (line string) stored as RCFILE", "-p","rwx-wx---"});
+ }
+ catch(Exception e){
+ assertTrue(e instanceof ExitException);
+ assertEquals(((ExitException)e).getStatus(), 0);
+ }
+ Warehouse wh = new Warehouse(conf);
+ Path dfsPath = wh.getDefaultTablePath(MetaStoreUtils.DEFAULT_DATABASE_NAME, "testNoPartTbl");
+ FileSystem fs = dfsPath.getFileSystem(conf);
+ assertEquals(fs.getFileStatus(dfsPath).getPermission(),FsPermission.valueOf("drwx-wx---"));
+
+ pig.setBatchOn();
+ pig.registerQuery("A = load 'build.xml' as (line:chararray);");
+ pig.registerQuery("store A into 'testNoPartTbl' using "+HCatStorer.class.getName()+"();");
+ pig.executeBatch();
+ FileStatus[] status = fs.listStatus(dfsPath,hiddenFileFilter);
+
+ assertEquals(status.length, 1);
+ assertEquals(FsPermission.valueOf("drwx-wx---"),status[0].getPermission());
+
+ try{
+ HCatCli.main(new String[]{"-e","create table testPartTbl (line string) partitioned by (a string) stored as RCFILE", "-p","rwx-wx--x"});
+ }
+ catch(Exception e){
+ assertTrue(e instanceof ExitException);
+ assertEquals(((ExitException)e).getStatus(), 0);
+ }
+
+ dfsPath = wh.getDefaultTablePath(MetaStoreUtils.DEFAULT_DATABASE_NAME, "testPartTbl");
+ assertEquals(fs.getFileStatus(dfsPath).getPermission(),FsPermission.valueOf("drwx-wx--x"));
+
+ pig.setBatchOn();
+ pig.registerQuery("A = load 'build.xml' as (line:chararray);");
+ pig.registerQuery("store A into 'testPartTbl' using "+HCatStorer.class.getName()+"('a=part');");
+ pig.executeBatch();
+
+ Path partPath = new Path(dfsPath,"a=part");
+ assertEquals(FsPermission.valueOf("drwx-wx--x"),fs.getFileStatus(partPath).getPermission());
+ status = fs.listStatus(partPath,hiddenFileFilter);
+ assertEquals(status.length, 1);
+ assertEquals(FsPermission.valueOf("drwx-wx--x"),status[0].getPermission());
+ }
+
+ private static final PathFilter hiddenFileFilter = new PathFilter(){
+ public boolean accept(Path p){
+ String name = p.getName();
+ return !name.startsWith("_") && !name.startsWith(".");
+ }
+ };
+}
Added: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestPigStorageDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestPigStorageDriver.java?rev=1091509&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestPigStorageDriver.java (added)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestPigStorageDriver.java Tue Apr 12 17:30:08 2011
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.pig;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.UnknownTableException;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
+import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer;
+import org.apache.hcatalog.pig.HCatLoader;
+import org.apache.hcatalog.pig.drivers.PigStorageInputDriver;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.util.UDFContext;
+import org.apache.thrift.TException;
+
+public class TestPigStorageDriver extends TestCase {
+
+ private HiveConf howlConf;
+ private Driver howlDriver;
+ private HiveMetaStoreClient msc;
+
+ @Override
+ protected void setUp() throws Exception {
+
+ howlConf = new HiveConf(this.getClass());
+ howlConf.set(ConfVars.PREEXECHOOKS.varname, "");
+ howlConf.set(ConfVars.POSTEXECHOOKS.varname, "");
+ howlConf.set(ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+ howlConf.set(ConfVars.SEMANTIC_ANALYZER_HOOK.varname, HCatSemanticAnalyzer.class.getName());
+ howlDriver = new Driver(howlConf);
+ msc = new HiveMetaStoreClient(howlConf);
+ SessionState.start(new CliSessionState(howlConf));
+ super.setUp();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ }
+
+ public void testPigStorageDriver() throws IOException{
+
+
+ String fsLoc = howlConf.get("fs.default.name");
+ Path tblPath = new Path(fsLoc, "/tmp/test_pig/data");
+ String anyExistingFileInCurDir = "ivy.xml";
+ tblPath.getFileSystem(howlConf).copyFromLocalFile(new Path(anyExistingFileInCurDir),tblPath);
+
+ howlDriver.run("drop table junit_pigstorage");
+ CommandProcessorResponse resp;
+ String createTable = "create table junit_pigstorage (a string) partitioned by (b string) stored as RCFILE";
+
+ resp = howlDriver.run(createTable);
+ assertEquals(0, resp.getResponseCode());
+ assertNull(resp.getErrorMessage());
+
+ resp = howlDriver.run("alter table junit_pigstorage add partition (b='2010-10-10') location '"+new Path(fsLoc, "/tmp/test_pig")+"'");
+ assertEquals(0, resp.getResponseCode());
+ assertNull(resp.getErrorMessage());
+
+ resp = howlDriver.run("alter table junit_pigstorage partition (b='2010-10-10') set fileformat inputformat '" + RCFileInputFormat.class.getName()
+ +"' outputformat '"+RCFileOutputFormat.class.getName()+"' inputdriver '"+PigStorageInputDriver.class.getName()+"' outputdriver 'non-existent'");
+ assertEquals(0, resp.getResponseCode());
+ assertNull(resp.getErrorMessage());
+
+ resp = howlDriver.run("desc extended junit_pigstorage partition (b='2010-10-10')");
+ assertEquals(0, resp.getResponseCode());
+ assertNull(resp.getErrorMessage());
+
+ PigServer server = new PigServer(ExecType.LOCAL, howlConf.getAllProperties());
+ UDFContext.getUDFContext().setClientSystemProps();
+ server.registerQuery(" a = load 'junit_pigstorage' using "+HCatLoader.class.getName()+";");
+ Iterator<Tuple> itr = server.openIterator("a");
+ DataInputStream stream = new DataInputStream(new BufferedInputStream(new FileInputStream(new File(anyExistingFileInCurDir))));
+ while(itr.hasNext()){
+ Tuple t = itr.next();
+ assertEquals(2, t.size());
+ if(t.get(0) != null) {
+ // If underlying data-field is empty. PigStorage inserts null instead
+ // of empty String objects.
+ assertTrue(t.get(0) instanceof String);
+ assertEquals(stream.readLine(), t.get(0));
+ }
+ else{
+ assertTrue(stream.readLine().isEmpty());
+ }
+ assertTrue(t.get(1) instanceof String);
+
+ assertEquals("2010-10-10", t.get(1));
+ }
+ assertEquals(0,stream.available());
+ stream.close();
+ howlDriver.run("drop table junit_pigstorage");
+ }
+
+ public void testDelim() throws MetaException, TException, UnknownTableException, NoSuchObjectException, InvalidOperationException, IOException{
+
+ howlDriver.run("drop table junit_pigstorage_delim");
+
+ CommandProcessorResponse resp;
+ String createTable = "create table junit_pigstorage_delim (a string) partitioned by (b string) stored as RCFILE";
+
+ resp = howlDriver.run(createTable);
+
+ assertEquals(0, resp.getResponseCode());
+ assertNull(resp.getErrorMessage());
+
+ resp = howlDriver.run("alter table junit_pigstorage_delim add partition (b='2010-10-10')");
+ assertEquals(0, resp.getResponseCode());
+ assertNull(resp.getErrorMessage());
+
+ resp = howlDriver.run("alter table junit_pigstorage_delim partition (b='2010-10-10') set fileformat inputformat '" + RCFileInputFormat.class.getName()
+ +"' outputformat '"+RCFileOutputFormat.class.getName()+"' inputdriver '"+MyPigStorageDriver.class.getName()+"' outputdriver 'non-existent'");
+
+ Partition part = msc.getPartition(MetaStoreUtils.DEFAULT_DATABASE_NAME, "junit_pigstorage_delim", "b=2010-10-10");
+ Map<String,String> partParms = part.getParameters();
+ partParms.put(PigStorageInputDriver.delim, "control-A");
+
+ msc.alter_partition(MetaStoreUtils.DEFAULT_DATABASE_NAME, "junit_pigstorage_delim", part);
+
+ PigServer server = new PigServer(ExecType.LOCAL, howlConf.getAllProperties());
+ UDFContext.getUDFContext().setClientSystemProps();
+ server.registerQuery(" a = load 'junit_pigstorage_delim' using "+HCatLoader.class.getName()+";");
+ try{
+ server.openIterator("a");
+ }catch(FrontendException fe){}
+ }
+}
Added: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/rcfile/TestRCFileInputStorageDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/rcfile/TestRCFileInputStorageDriver.java?rev=1091509&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/rcfile/TestRCFileInputStorageDriver.java (added)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/rcfile/TestRCFileInputStorageDriver.java Tue Apr 12 17:30:08 2011
@@ -0,0 +1,374 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.rcfile;import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.ql.io.RCFile;
+import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.rcfile.RCFileInputDriver;
+;
+
+public class TestRCFileInputStorageDriver extends TestCase{
+
+ private static Configuration conf = new Configuration();
+
+ private static Path file;
+
+ private static FileSystem fs;
+
+ static {
+
+ try {
+ fs = FileSystem.getLocal(conf);
+ Path dir = new Path(System.getProperty("test.data.dir", ".") + "/mapred");
+ file = new Path(dir, "test_rcfile");
+ fs.delete(dir, true);
+ } catch (Exception e) {
+ }
+ }
+
+ public void testConvertValueToTuple() throws IOException,InterruptedException{
+ fs.delete(file, true);
+
+ byte[][] record_1 = {"123".getBytes("UTF-8"), "456".getBytes("UTF-8"),
+ "789".getBytes("UTF-8"), "1000".getBytes("UTF-8"),
+ "5.3".getBytes("UTF-8"), "howl and hadoop".getBytes("UTF-8"),
+ new byte[0], "\\N".getBytes("UTF-8")};
+ byte[][] record_2 = {"100".getBytes("UTF-8"), "200".getBytes("UTF-8"),
+ "123".getBytes("UTF-8"), "1000".getBytes("UTF-8"),
+ "5.3".getBytes("UTF-8"), "howl and hadoop".getBytes("UTF-8"),
+ new byte[0], "\\N".getBytes("UTF-8")};
+
+ RCFileOutputFormat.setColumnNumber(conf, 8);
+ RCFile.Writer writer = new RCFile.Writer(fs, conf, file, null,
+ new DefaultCodec());
+ BytesRefArrayWritable bytes = new BytesRefArrayWritable(record_1.length);
+ for (int i = 0; i < record_1.length; i++) {
+ BytesRefWritable cu = new BytesRefWritable(record_1[i], 0,
+ record_1[i].length);
+ bytes.set(i, cu);
+ }
+ writer.append(bytes);
+ BytesRefArrayWritable bytes2 = new BytesRefArrayWritable(record_2.length);
+ for (int i = 0; i < record_2.length; i++) {
+ BytesRefWritable cu = new BytesRefWritable(record_2[i], 0,
+ record_2[i].length);
+ bytes2.set(i, cu);
+ }
+ writer.append(bytes2);
+ writer.close();
+ BytesRefArrayWritable[] bytesArr = new BytesRefArrayWritable[]{bytes,bytes2};
+
+ HCatSchema schema = buildHiveSchema();
+ RCFileInputDriver sd = new RCFileInputDriver();
+ JobContext jc = new JobContext(conf, new JobID());
+ sd.setInputPath(jc, file.toString());
+ InputFormat<?,?> iF = sd.getInputFormat(null);
+ InputSplit split = iF.getSplits(jc).get(0);
+ sd.setOriginalSchema(jc, schema);
+ sd.setOutputSchema(jc, schema);
+ sd.initialize(jc, getProps());
+
+ TaskAttemptContext tac = new TaskAttemptContext(conf, new TaskAttemptID());
+ RecordReader<?,?> rr = iF.createRecordReader(split,tac);
+ rr.initialize(split, tac);
+ HCatRecord[] tuples = getExpectedRecords();
+ for(int j=0; j < 2; j++){
+ Assert.assertTrue(rr.nextKeyValue());
+ BytesRefArrayWritable w = (BytesRefArrayWritable)rr.getCurrentValue();
+ Assert.assertEquals(bytesArr[j], w);
+ HCatRecord t = sd.convertToHCatRecord(null,w);
+ Assert.assertEquals(8, t.size());
+ Assert.assertEquals(t,tuples[j]);
+ }
+ }
+
+ public void testPruning() throws IOException,InterruptedException{
+ fs.delete(file, true);
+
+ byte[][] record_1 = {"123".getBytes("UTF-8"), "456".getBytes("UTF-8"),
+ "789".getBytes("UTF-8"), "1000".getBytes("UTF-8"),
+ "5.3".getBytes("UTF-8"), "howl and hadoop".getBytes("UTF-8"),
+ new byte[0], "\\N".getBytes("UTF-8")};
+ byte[][] record_2 = {"100".getBytes("UTF-8"), "200".getBytes("UTF-8"),
+ "123".getBytes("UTF-8"), "1000".getBytes("UTF-8"),
+ "5.3".getBytes("UTF-8"), "howl and hadoop".getBytes("UTF-8"),
+ new byte[0], "\\N".getBytes("UTF-8")};
+
+ RCFileOutputFormat.setColumnNumber(conf, 8);
+ RCFile.Writer writer = new RCFile.Writer(fs, conf, file, null,
+ new DefaultCodec());
+ BytesRefArrayWritable bytes = new BytesRefArrayWritable(record_1.length);
+ for (int i = 0; i < record_1.length; i++) {
+ BytesRefWritable cu = new BytesRefWritable(record_1[i], 0,
+ record_1[i].length);
+ bytes.set(i, cu);
+ }
+ writer.append(bytes);
+ BytesRefArrayWritable bytes2 = new BytesRefArrayWritable(record_2.length);
+ for (int i = 0; i < record_2.length; i++) {
+ BytesRefWritable cu = new BytesRefWritable(record_2[i], 0,
+ record_2[i].length);
+ bytes2.set(i, cu);
+ }
+ writer.append(bytes2);
+ writer.close();
+ BytesRefArrayWritable[] bytesArr = new BytesRefArrayWritable[]{bytes,bytes2};
+
+ RCFileInputDriver sd = new RCFileInputDriver();
+ JobContext jc = new JobContext(conf, new JobID());
+ sd.setInputPath(jc, file.toString());
+ InputFormat<?,?> iF = sd.getInputFormat(null);
+ InputSplit split = iF.getSplits(jc).get(0);
+ sd.setOriginalSchema(jc, buildHiveSchema());
+ sd.setOutputSchema(jc, buildPrunedSchema());
+
+ sd.initialize(jc, getProps());
+ conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR,jc.getConfiguration().get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
+ TaskAttemptContext tac = new TaskAttemptContext(conf, new TaskAttemptID());
+ RecordReader<?,?> rr = iF.createRecordReader(split,tac);
+ rr.initialize(split, tac);
+ HCatRecord[] tuples = getPrunedRecords();
+ for(int j=0; j < 2; j++){
+ Assert.assertTrue(rr.nextKeyValue());
+ BytesRefArrayWritable w = (BytesRefArrayWritable)rr.getCurrentValue();
+ Assert.assertFalse(bytesArr[j].equals(w));
+ Assert.assertEquals(w.size(), 8);
+ HCatRecord t = sd.convertToHCatRecord(null,w);
+ Assert.assertEquals(5, t.size());
+ Assert.assertEquals(t,tuples[j]);
+ }
+ assertFalse(rr.nextKeyValue());
+ }
+
+ public void testReorderdCols() throws IOException,InterruptedException{
+ fs.delete(file, true);
+
+ byte[][] record_1 = {"123".getBytes("UTF-8"), "456".getBytes("UTF-8"),
+ "789".getBytes("UTF-8"), "1000".getBytes("UTF-8"),
+ "5.3".getBytes("UTF-8"), "howl and hadoop".getBytes("UTF-8"),
+ new byte[0], "\\N".getBytes("UTF-8")};
+ byte[][] record_2 = {"100".getBytes("UTF-8"), "200".getBytes("UTF-8"),
+ "123".getBytes("UTF-8"), "1000".getBytes("UTF-8"),
+ "5.3".getBytes("UTF-8"), "howl and hadoop".getBytes("UTF-8"),
+ new byte[0], "\\N".getBytes("UTF-8")};
+
+ RCFileOutputFormat.setColumnNumber(conf, 8);
+ RCFile.Writer writer = new RCFile.Writer(fs, conf, file, null,
+ new DefaultCodec());
+ BytesRefArrayWritable bytes = new BytesRefArrayWritable(record_1.length);
+ for (int i = 0; i < record_1.length; i++) {
+ BytesRefWritable cu = new BytesRefWritable(record_1[i], 0,
+ record_1[i].length);
+ bytes.set(i, cu);
+ }
+ writer.append(bytes);
+ BytesRefArrayWritable bytes2 = new BytesRefArrayWritable(record_2.length);
+ for (int i = 0; i < record_2.length; i++) {
+ BytesRefWritable cu = new BytesRefWritable(record_2[i], 0,
+ record_2[i].length);
+ bytes2.set(i, cu);
+ }
+ writer.append(bytes2);
+ writer.close();
+ BytesRefArrayWritable[] bytesArr = new BytesRefArrayWritable[]{bytes,bytes2};
+
+ RCFileInputDriver sd = new RCFileInputDriver();
+ JobContext jc = new JobContext(conf, new JobID());
+ sd.setInputPath(jc, file.toString());
+ InputFormat<?,?> iF = sd.getInputFormat(null);
+ InputSplit split = iF.getSplits(jc).get(0);
+ sd.setOriginalSchema(jc, buildHiveSchema());
+ sd.setOutputSchema(jc, buildReorderedSchema());
+
+ sd.initialize(jc, getProps());
+ Map<String,String> map = new HashMap<String,String>(1);
+ map.put("part1", "first-part");
+ sd.setPartitionValues(jc, map);
+ conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR,jc.getConfiguration().get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
+ TaskAttemptContext tac = new TaskAttemptContext(conf, new TaskAttemptID());
+ RecordReader<?,?> rr = iF.createRecordReader(split,tac);
+ rr.initialize(split, tac);
+ HCatRecord[] tuples = getReorderedCols();
+ for(int j=0; j < 2; j++){
+ Assert.assertTrue(rr.nextKeyValue());
+ BytesRefArrayWritable w = (BytesRefArrayWritable)rr.getCurrentValue();
+ Assert.assertFalse(bytesArr[j].equals(w));
+ Assert.assertEquals(w.size(), 8);
+ HCatRecord t = sd.convertToHCatRecord(null,w);
+ Assert.assertEquals(7, t.size());
+ Assert.assertEquals(t,tuples[j]);
+ }
+ assertFalse(rr.nextKeyValue());
+ }
+
+ private HCatRecord[] getExpectedRecords(){
+
+ List<Object> rec_1 = new ArrayList<Object>(8);
+ rec_1.add(new Byte("123"));
+ rec_1.add(new Short("456"));
+ rec_1.add( new Integer(789));
+ rec_1.add( new Long(1000L));
+ rec_1.add( new Double(5.3D));
+ rec_1.add( new String("howl and hadoop"));
+ rec_1.add( null);
+ rec_1.add( null);
+
+ HCatRecord tup_1 = new DefaultHCatRecord(rec_1);
+
+ List<Object> rec_2 = new ArrayList<Object>(8);
+ rec_2.add( new Byte("100"));
+ rec_2.add( new Short("200"));
+ rec_2.add( new Integer(123));
+ rec_2.add( new Long(1000L));
+ rec_2.add( new Double(5.3D));
+ rec_2.add( new String("howl and hadoop"));
+ rec_2.add( null);
+ rec_2.add( null);
+ HCatRecord tup_2 = new DefaultHCatRecord(rec_2);
+
+ return new HCatRecord[]{tup_1,tup_2};
+
+ }
+
+ private HCatRecord[] getPrunedRecords(){
+
+ List<Object> rec_1 = new ArrayList<Object>(8);
+ rec_1.add(new Byte("123"));
+ rec_1.add( new Integer(789));
+ rec_1.add( new Double(5.3D));
+ rec_1.add( new String("howl and hadoop"));
+ rec_1.add( null);
+ HCatRecord tup_1 = new DefaultHCatRecord(rec_1);
+
+ List<Object> rec_2 = new ArrayList<Object>(8);
+ rec_2.add( new Byte("100"));
+ rec_2.add( new Integer(123));
+ rec_2.add( new Double(5.3D));
+ rec_2.add( new String("howl and hadoop"));
+ rec_2.add( null);
+ HCatRecord tup_2 = new DefaultHCatRecord(rec_2);
+
+ return new HCatRecord[]{tup_1,tup_2};
+
+ }
+
+ private HCatSchema buildHiveSchema() throws HCatException{
+
+ List<FieldSchema> fields = new ArrayList<FieldSchema>(8);
+ fields.add(new FieldSchema("atinyint", "tinyint", ""));
+ fields.add(new FieldSchema("asmallint", "smallint", ""));
+ fields.add(new FieldSchema("aint", "int", ""));
+ fields.add(new FieldSchema("along", "bigint", ""));
+ fields.add(new FieldSchema("adouble", "double", ""));
+ fields.add(new FieldSchema("astring", "string", ""));
+ fields.add(new FieldSchema("anullint", "int", ""));
+ fields.add(new FieldSchema("anullstring", "string", ""));
+
+ return new HCatSchema(HCatUtil.getHCatFieldSchemaList(fields));
+ }
+
+ private HCatSchema buildPrunedSchema() throws HCatException{
+
+ List<FieldSchema> fields = new ArrayList<FieldSchema>(5);
+ fields.add(new FieldSchema("atinyint", "tinyint", ""));
+ fields.add(new FieldSchema("aint", "int", ""));
+ fields.add(new FieldSchema("adouble", "double", ""));
+ fields.add(new FieldSchema("astring", "string", ""));
+ fields.add(new FieldSchema("anullint", "int", ""));
+
+ return new HCatSchema(HCatUtil.getHCatFieldSchemaList(fields));
+ }
+
+ private HCatSchema buildReorderedSchema() throws HCatException{
+
+ List<FieldSchema> fields = new ArrayList<FieldSchema>(7);
+
+ fields.add(new FieldSchema("aint", "int", ""));
+ fields.add(new FieldSchema("part1", "string", ""));
+ fields.add(new FieldSchema("adouble", "double", ""));
+ fields.add(new FieldSchema("newCol", "tinyint", ""));
+ fields.add(new FieldSchema("astring", "string", ""));
+ fields.add(new FieldSchema("atinyint", "tinyint", ""));
+ fields.add(new FieldSchema("anullint", "int", ""));
+
+
+ return new HCatSchema(HCatUtil.getHCatFieldSchemaList(fields));
+ }
+
+ private HCatRecord[] getReorderedCols(){
+
+ List<Object> rec_1 = new ArrayList<Object>(7);
+ rec_1.add( new Integer(789));
+ rec_1.add( new String("first-part"));
+ rec_1.add( new Double(5.3D));
+ rec_1.add( null); // new column
+ rec_1.add( new String("howl and hadoop"));
+ rec_1.add( new Byte("123"));
+ rec_1.add( null);
+ HCatRecord tup_1 = new DefaultHCatRecord(rec_1);
+
+ List<Object> rec_2 = new ArrayList<Object>(7);
+
+ rec_2.add( new Integer(123));
+ rec_2.add( new String("first-part"));
+ rec_2.add( new Double(5.3D));
+ rec_2.add(null);
+ rec_2.add( new String("howl and hadoop"));
+ rec_2.add( new Byte("100"));
+ rec_2.add( null);
+ HCatRecord tup_2 = new DefaultHCatRecord(rec_2);
+
+ return new HCatRecord[]{tup_1,tup_2};
+
+ }
+ private Properties getProps(){
+ Properties props = new Properties();
+ props.setProperty(Constants.SERIALIZATION_NULL_FORMAT, "\\N");
+ props.setProperty(Constants.SERIALIZATION_FORMAT, "9");
+ return props;
+ }
+}
Added: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/rcfile/TestRCFileMapReduceInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/rcfile/TestRCFileMapReduceInputFormat.java?rev=1091509&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/rcfile/TestRCFileMapReduceInputFormat.java (added)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/rcfile/TestRCFileMapReduceInputFormat.java Tue Apr 12 17:30:08 2011
@@ -0,0 +1,247 @@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.hcatalog.rcfile;
+
+ import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.List;
+import java.util.Properties;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.RCFile;
+import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;
+import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hcatalog.rcfile.RCFileMapReduceInputFormat;
+
+ /**
+ * TestRCFile.
+ *
+ */
+ public class TestRCFileMapReduceInputFormat extends TestCase {
+
+ private static final Log LOG = LogFactory.getLog(TestRCFileMapReduceInputFormat.class);
+
+ private static Configuration conf = new Configuration();
+
+ private static ColumnarSerDe serDe;
+
+ private static Path file;
+
+ private static FileSystem fs;
+
+ private static Properties tbl;
+
+ static {
+ try {
+ fs = FileSystem.getLocal(conf);
+ Path dir = new Path(System.getProperty("test.data.dir", ".") + "/mapred");
+ file = new Path(dir, "test_rcfile");
+ fs.delete(dir, true);
+ // the SerDe part is from TestLazySimpleSerDe
+ serDe = new ColumnarSerDe();
+ // Create the SerDe
+ tbl = createProperties();
+ serDe.initialize(conf, tbl);
+ } catch (Exception e) {
+ }
+ }
+
+ private static BytesRefArrayWritable patialS = new BytesRefArrayWritable();
+
+ private static byte[][] bytesArray = null;
+
+ private static BytesRefArrayWritable s = null;
+ static {
+ try {
+ bytesArray = new byte[][] {"123".getBytes("UTF-8"),
+ "456".getBytes("UTF-8"), "789".getBytes("UTF-8"),
+ "1000".getBytes("UTF-8"), "5.3".getBytes("UTF-8"),
+ "hive and hadoop".getBytes("UTF-8"), new byte[0],
+ "NULL".getBytes("UTF-8")};
+ s = new BytesRefArrayWritable(bytesArray.length);
+ s.set(0, new BytesRefWritable("123".getBytes("UTF-8")));
+ s.set(1, new BytesRefWritable("456".getBytes("UTF-8")));
+ s.set(2, new BytesRefWritable("789".getBytes("UTF-8")));
+ s.set(3, new BytesRefWritable("1000".getBytes("UTF-8")));
+ s.set(4, new BytesRefWritable("5.3".getBytes("UTF-8")));
+ s.set(5, new BytesRefWritable("hive and hadoop".getBytes("UTF-8")));
+ s.set(6, new BytesRefWritable("NULL".getBytes("UTF-8")));
+ s.set(7, new BytesRefWritable("NULL".getBytes("UTF-8")));
+
+ // partial test init
+ patialS.set(0, new BytesRefWritable("NULL".getBytes("UTF-8")));
+ patialS.set(1, new BytesRefWritable("NULL".getBytes("UTF-8")));
+ patialS.set(2, new BytesRefWritable("789".getBytes("UTF-8")));
+ patialS.set(3, new BytesRefWritable("1000".getBytes("UTF-8")));
+ patialS.set(4, new BytesRefWritable("NULL".getBytes("UTF-8")));
+ patialS.set(5, new BytesRefWritable("NULL".getBytes("UTF-8")));
+ patialS.set(6, new BytesRefWritable("NULL".getBytes("UTF-8")));
+ patialS.set(7, new BytesRefWritable("NULL".getBytes("UTF-8")));
+
+ } catch (UnsupportedEncodingException e) {
+ }
+ }
+
+
+ /** For debugging and testing. */
+ public static void main(String[] args) throws Exception {
+ int count = 10000;
+ boolean create = true;
+
+ String usage = "Usage: RCFile " + "[-count N]" + " file";
+ if (args.length == 0) {
+ System.err.println(usage);
+ System.exit(-1);
+ }
+
+ try {
+ for (int i = 0; i < args.length; ++i) { // parse command line
+ if (args[i] == null) {
+ continue;
+ } else if (args[i].equals("-count")) {
+ count = Integer.parseInt(args[++i]);
+ } else {
+ // file is required parameter
+ file = new Path(args[i]);
+ }
+ }
+
+ if (file == null) {
+ System.err.println(usage);
+ System.exit(-1);
+ }
+
+ LOG.info("count = " + count);
+ LOG.info("create = " + create);
+ LOG.info("file = " + file);
+
+ // test.performanceTest();
+ System.out.println("Finished.");
+ } finally {
+ fs.close();
+ }
+ }
+
+ private static Properties createProperties() {
+ Properties tbl = new Properties();
+
+ // Set the configuration parameters
+ tbl.setProperty(Constants.SERIALIZATION_FORMAT, "9");
+ tbl.setProperty("columns",
+ "abyte,ashort,aint,along,adouble,astring,anullint,anullstring");
+ tbl.setProperty("columns.types",
+ "tinyint:smallint:int:bigint:double:string:int:string");
+ tbl.setProperty(Constants.SERIALIZATION_NULL_FORMAT, "NULL");
+ return tbl;
+ }
+
+
+
+ public void testSynAndSplit() throws IOException, InterruptedException {
+ splitBeforeSync();
+ splitRightBeforeSync();
+ splitInMiddleOfSync();
+ splitRightAfterSync();
+ splitAfterSync();
+ }
+
+ private void splitBeforeSync() throws IOException,InterruptedException {
+ writeThenReadByRecordReader(600, 1000, 2, 17684, null);
+ }
+
+ private void splitRightBeforeSync() throws IOException ,InterruptedException{
+ writeThenReadByRecordReader(500, 1000, 2, 17750, null);
+ }
+
+ private void splitInMiddleOfSync() throws IOException,InterruptedException {
+ writeThenReadByRecordReader(500, 1000, 2, 17760, null);
+
+ }
+
+ private void splitRightAfterSync() throws IOException, InterruptedException {
+ writeThenReadByRecordReader(500, 1000, 2, 17770, null);
+ }
+
+ private void splitAfterSync() throws IOException ,InterruptedException{
+ writeThenReadByRecordReader(500, 1000, 2, 19950, null);
+ }
+
+ private void writeThenReadByRecordReader(int intervalRecordCount,
+ int writeCount, int splitNumber, long maxSplitSize, CompressionCodec codec)
+ throws IOException, InterruptedException {
+ Path testDir = new Path(System.getProperty("test.data.dir", ".")
+ + "/mapred/testsmallfirstsplit");
+ Path testFile = new Path(testDir, "test_rcfile");
+ fs.delete(testFile, true);
+ Configuration cloneConf = new Configuration(conf);
+ RCFileOutputFormat.setColumnNumber(cloneConf, bytesArray.length);
+ cloneConf.setInt(RCFile.RECORD_INTERVAL_CONF_STR, intervalRecordCount);
+
+ RCFile.Writer writer = new RCFile.Writer(fs, cloneConf, testFile, null, codec);
+
+ BytesRefArrayWritable bytes = new BytesRefArrayWritable(bytesArray.length);
+ for (int i = 0; i < bytesArray.length; i++) {
+ BytesRefWritable cu = null;
+ cu = new BytesRefWritable(bytesArray[i], 0, bytesArray[i].length);
+ bytes.set(i, cu);
+ }
+ for (int i = 0; i < writeCount; i++) {
+ writer.append(bytes);
+ }
+ writer.close();
+
+ RCFileMapReduceInputFormat<LongWritable, BytesRefArrayWritable> inputFormat = new RCFileMapReduceInputFormat<LongWritable, BytesRefArrayWritable>();
+ Configuration jonconf = new Configuration(cloneConf);
+ jonconf.set("mapred.input.dir", testDir.toString());
+ JobContext context = new Job(jonconf);
+ context.getConfiguration().setLong("mapred.max.split.size",maxSplitSize);
+ List<InputSplit> splits = inputFormat.getSplits(context);
+ assertEquals("splits length should be " + splitNumber, splits.size(), splitNumber);
+ int readCount = 0;
+ for (int i = 0; i < splits.size(); i++) {
+ TaskAttemptContext tac = new TaskAttemptContext(jonconf, new TaskAttemptID());
+ RecordReader<LongWritable, BytesRefArrayWritable> rr = inputFormat.createRecordReader(splits.get(i), tac);
+ rr.initialize(splits.get(i), tac);
+ while (rr.nextKeyValue()) {
+ readCount++;
+ }
+ }
+ assertEquals("readCount should be equal to writeCount", readCount, writeCount);
+ }
+
+ }
+
+
Added: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/rcfile/TestRCFileOutputStorageDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/rcfile/TestRCFileOutputStorageDriver.java?rev=1091509&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/rcfile/TestRCFileOutputStorageDriver.java (added)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/rcfile/TestRCFileOutputStorageDriver.java Tue Apr 12 17:30:08 2011
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hcatalog.rcfile;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.mapreduce.HCatInputStorageDriver;
+import org.apache.hcatalog.mapreduce.HCatOutputStorageDriver;
+import org.apache.hcatalog.rcfile.RCFileInputDriver;
+import org.apache.hcatalog.rcfile.RCFileOutputDriver;
+
+public class TestRCFileOutputStorageDriver extends TestCase {
+
+ public void testConversion() throws IOException {
+ Configuration conf = new Configuration();
+ JobContext jc = new JobContext(conf, new JobID());
+
+ HCatSchema schema = buildHiveSchema();
+ HCatInputStorageDriver isd = new RCFileInputDriver();
+
+ isd.setOriginalSchema(jc, schema);
+ isd.setOutputSchema(jc, schema);
+ isd.initialize(jc, new Properties());
+
+ byte[][] byteArray = buildBytesArray();
+
+ BytesRefArrayWritable bytesWritable = new BytesRefArrayWritable(byteArray.length);
+ for (int i = 0; i < byteArray.length; i++) {
+ BytesRefWritable cu = new BytesRefWritable(byteArray[i], 0, byteArray[i].length);
+ bytesWritable.set(i, cu);
+ }
+
+ //Convert byte array to HowlRecord using isd, convert howlrecord back to byte array
+ //using osd, compare the two arrays
+ HCatRecord record = isd.convertToHCatRecord(null, bytesWritable);
+
+ HCatOutputStorageDriver osd = new RCFileOutputDriver();
+
+ osd.setSchema(jc, schema);
+ osd.initialize(jc, new Properties());
+
+ BytesRefArrayWritable bytesWritableOutput = (BytesRefArrayWritable) osd.convertValue(record);
+
+ assertTrue(bytesWritableOutput.compareTo(bytesWritable) == 0);
+ }
+
+ private byte[][] buildBytesArray() throws UnsupportedEncodingException {
+ byte[][] bytes = {"123".getBytes("UTF-8"), "456".getBytes("UTF-8"),
+ "789".getBytes("UTF-8"), "1000".getBytes("UTF-8"),
+ "5.3".getBytes("UTF-8"), "howl and hadoop".getBytes("UTF-8"),
+ new byte[0], "\\N".getBytes("UTF-8") };
+ return bytes;
+ }
+
+ private HCatSchema buildHiveSchema() throws HCatException{
+
+ List<FieldSchema> fields = new ArrayList<FieldSchema>(8);
+ fields.add(new FieldSchema("atinyint", "tinyint", ""));
+ fields.add(new FieldSchema("asmallint", "smallint", ""));
+ fields.add(new FieldSchema("aint", "int", ""));
+ fields.add(new FieldSchema("along", "bigint", ""));
+ fields.add(new FieldSchema("adouble", "double", ""));
+ fields.add(new FieldSchema("astring", "string", ""));
+ fields.add(new FieldSchema("anullint", "int", ""));
+ fields.add(new FieldSchema("anullstring", "string", ""));
+
+ return new HCatSchema(HCatUtil.getHCatFieldSchemaList(fields));
+ }
+}