You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by ma...@apache.org on 2011/05/04 17:50:43 UTC
svn commit: r1099539 [2/3] - in /incubator/hcatalog/trunk/src:
java/org/apache/hcatalog/mapreduce/ java/org/apache/hcatalog/pig/
test/org/apache/hcatalog/mapreduce/ test/org/apache/hcatalog/pig/
Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatEximStorer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatEximStorer.java?rev=1099539&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatEximStorer.java (added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatEximStorer.java Wed May 4 17:50:42 2011
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.pig;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.mapreduce.HCatEximOutputCommitter;
+import org.apache.hcatalog.mapreduce.HCatEximOutputFormat;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.UDFContext;
+
+/**
+ * HCatEximStorer.
+ *
+ */
+
+public class HCatEximStorer extends HCatBaseStorer {
+
+ private static final Log LOG = LogFactory.getLog(HCatEximStorer.class);
+
+ private final String outputLocation;
+
+ public HCatEximStorer(String outputLocation) throws FrontendException, ParseException {
+ this(outputLocation, null, null);
+ }
+
+ public HCatEximStorer(String outputLocation, String partitionSpec) throws FrontendException,
+ ParseException {
+ this(outputLocation, partitionSpec, null);
+ }
+
+ public HCatEximStorer(String outputLocation, String partitionSpec, String schema)
+ throws FrontendException, ParseException {
+ super(partitionSpec, schema);
+ this.outputLocation = outputLocation;
+ LOG.debug("HCatEximStorer called");
+ }
+
+ @Override
+ public OutputFormat getOutputFormat() throws IOException {
+ LOG.debug("getOutputFormat called");
+ return new HCatEximOutputFormat();
+ }
+
+ @Override
+ public void setStoreLocation(String location, Job job) throws IOException {
+ LOG.debug("setStoreLocation called with :" + location);
+ String[] userStr = location.split("\\.");
+ String dbname = MetaStoreUtils.DEFAULT_DATABASE_NAME;
+ String tablename = null;
+ if (userStr.length == 2) {
+ dbname = userStr[0];
+ tablename = userStr[1];
+ } else {
+ tablename = userStr[0];
+ }
+ Properties p = UDFContext.getUDFContext()
+ .getUDFProperties(this.getClass(), new String[] {sign});
+ Configuration config = job.getConfiguration();
+ if (!HCatUtil.checkJobContextIfRunningFromBackend(job)) {
+ Schema schema = (Schema) ObjectSerializer.deserialize(p.getProperty(PIG_SCHEMA));
+ if (schema != null) {
+ pigSchema = schema;
+ }
+ if (pigSchema == null) {
+ throw new FrontendException("Schema for data cannot be determined.",
+ PigHCatUtil.PIG_EXCEPTION_CODE);
+ }
+ HCatSchema hcatTblSchema = new HCatSchema(new ArrayList<HCatFieldSchema>());
+ try {
+ doSchemaValidations(pigSchema, hcatTblSchema);
+ } catch (HCatException he) {
+ throw new FrontendException(he.getMessage(), PigHCatUtil.PIG_EXCEPTION_CODE, he);
+ }
+
+ List<HCatFieldSchema> hcatFields = new ArrayList<HCatFieldSchema>();
+ List<String> partVals = new ArrayList<String>();
+ for (String key : partitions.keySet()) {
+ hcatFields.add(new HCatFieldSchema(key, HCatFieldSchema.Type.STRING, ""));
+ partVals.add(partitions.get(key));
+ }
+
+ HCatSchema outputSchema = convertPigSchemaToHCatSchema(pigSchema,
+ hcatTblSchema);
+ LOG.debug("Pig Schema '" + pigSchema.toString() + "' was converted to HCatSchema '"
+ + outputSchema);
+ HCatEximOutputFormat.setOutput(job,
+ dbname, tablename,
+ outputLocation,
+ new HCatSchema(hcatFields),
+ partVals,
+ outputSchema);
+ p.setProperty(COMPUTED_OUTPUT_SCHEMA, ObjectSerializer.serialize(outputSchema));
+ p.setProperty(HCatConstants.HCAT_KEY_OUTPUT_INFO,
+ config.get(HCatConstants.HCAT_KEY_OUTPUT_INFO));
+ if (config.get(HCatConstants.HCAT_KEY_HIVE_CONF) != null) {
+ p.setProperty(HCatConstants.HCAT_KEY_HIVE_CONF,
+ config.get(HCatConstants.HCAT_KEY_HIVE_CONF));
+ }
+ } else {
+ config.set(HCatConstants.HCAT_KEY_OUTPUT_INFO,
+ p.getProperty(HCatConstants.HCAT_KEY_OUTPUT_INFO));
+ if (p.getProperty(HCatConstants.HCAT_KEY_HIVE_CONF) != null) {
+ config.set(HCatConstants.HCAT_KEY_HIVE_CONF,
+ p.getProperty(HCatConstants.HCAT_KEY_HIVE_CONF));
+ }
+ }
+ }
+
+ @Override
+ public void storeSchema(ResourceSchema schema, String arg1, Job job) throws IOException {
+ if( job.getConfiguration().get("mapred.job.tracker", "").equalsIgnoreCase("local") ) {
+ //In local mode, mapreduce will not call HCatOutputCommitter.cleanupJob.
+ //Calling it from here so that the partition publish happens.
+ //This call needs to be removed after MAPREDUCE-1447 is fixed.
+ new HCatEximOutputCommitter(null).cleanupJob(job);
+ }
+ }
+}
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatLoader.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatLoader.java?rev=1099539&r1=1099538&r2=1099539&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatLoader.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatLoader.java Wed May 4 17:50:42 2011
@@ -18,7 +18,6 @@
package org.apache.hcatalog.pig;
import java.io.IOException;
-import java.util.Arrays;
import java.util.List;
import java.util.Properties;
@@ -27,48 +26,34 @@ import org.apache.hadoop.hive.metastore.
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hcatalog.common.HCatConstants;
import org.apache.hcatalog.common.HCatUtil;
-import org.apache.hcatalog.data.HCatRecord;
import org.apache.hcatalog.data.Pair;
import org.apache.hcatalog.data.schema.HCatSchema;
import org.apache.hcatalog.mapreduce.HCatInputFormat;
import org.apache.hcatalog.mapreduce.HCatTableInfo;
import org.apache.pig.Expression;
+import org.apache.pig.Expression.BinaryExpression;
import org.apache.pig.LoadFunc;
-import org.apache.pig.LoadMetadata;
-import org.apache.pig.LoadPushDown;
import org.apache.pig.PigException;
import org.apache.pig.ResourceSchema;
-import org.apache.pig.ResourceStatistics;
-import org.apache.pig.Expression.BinaryExpression;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.util.UDFContext;
/**
* Pig {@link LoadFunc} to read data from Howl
*/
-public class HCatLoader extends LoadFunc implements LoadMetadata, LoadPushDown {
+public class HCatLoader extends HCatBaseLoader {
- private static final String PRUNE_PROJECTION_INFO = "prune.projection.info";
private static final String PARTITION_FILTER = "partition.filter"; // for future use
private HCatInputFormat howlInputFormat = null;
- private RecordReader<?, ?> reader;
private String dbName;
private String tableName;
private String howlServerUri;
- private String signature;
private String partitionFilterString;
private final PigHCatUtil phutil = new PigHCatUtil();
- HCatSchema outputSchema = null;
-
@Override
public InputFormat<?,?> getInputFormat() throws IOException {
if(howlInputFormat == null) {
@@ -78,34 +63,6 @@ public class HCatLoader extends LoadFunc
}
@Override
- public Tuple getNext() throws IOException {
- try {
- HCatRecord hr = (HCatRecord) (reader.nextKeyValue() ? reader.getCurrentValue() : null);
- Tuple t = PigHCatUtil.transformToTuple(hr,outputSchema);
- // TODO : we were discussing an iter interface, and also a LazyTuple
- // change this when plans for that solidifies.
- return t;
- } catch (ExecException e) {
- int errCode = 6018;
- String errMsg = "Error while reading input";
- throw new ExecException(errMsg, errCode,
- PigException.REMOTE_ENVIRONMENT, e);
- } catch (Exception eOther){
- int errCode = 6018;
- String errMsg = "Error converting read value to tuple";
- throw new ExecException(errMsg, errCode,
- PigException.REMOTE_ENVIRONMENT, eOther);
- }
-
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void prepareToRead(RecordReader reader, PigSplit arg1) throws IOException {
- this.reader = reader;
- }
-
- @Override
public String relativeToAbsolutePath(String location, Path curDir) throws IOException {
return location;
}
@@ -207,12 +164,6 @@ public class HCatLoader extends LoadFunc
}
@Override
- public ResourceStatistics getStatistics(String location, Job job) throws IOException {
- // statistics not implemented currently
- return null;
- }
-
- @Override
public void setPartitionFilter(Expression partitionFilter) throws IOException {
// convert the partition filter expression into a string expected by
// howl and pass it in setLocation()
@@ -224,37 +175,6 @@ public class HCatLoader extends LoadFunc
PARTITION_FILTER, partitionFilterString);
}
- @Override
- public List<OperatorSet> getFeatures() {
- return Arrays.asList(LoadPushDown.OperatorSet.PROJECTION);
- }
-
- @Override
- public RequiredFieldResponse pushProjection(RequiredFieldList requiredFieldsInfo) throws FrontendException {
- // Store the required fields information in the UDFContext so that we
- // can retrieve it later.
- storeInUDFContext(signature, PRUNE_PROJECTION_INFO, requiredFieldsInfo);
-
- // Howl will always prune columns based on what we ask of it - so the
- // response is true
- return new RequiredFieldResponse(true);
- }
-
- @Override
- public void setUDFContextSignature(String signature) {
- this.signature = signature;
- }
-
-
- // helper methods
- private void storeInUDFContext(String signature, String key, Object value) {
- UDFContext udfContext = UDFContext.getUDFContext();
- Properties props = udfContext.getUDFProperties(
- this.getClass(), new String[] {signature});
- props.put(key, value);
- }
-
-
private String getPartitionFilterString() {
if(partitionFilterString == null) {
Properties props = UDFContext.getUDFContext().getUDFProperties(
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java?rev=1099539&r1=1099538&r2=1099539&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java Wed May 4 17:50:42 2011
@@ -19,85 +19,39 @@
package org.apache.hcatalog.pig;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hcatalog.common.HCatConstants;
import org.apache.hcatalog.common.HCatException;
import org.apache.hcatalog.common.HCatUtil;
-import org.apache.hcatalog.data.DefaultHCatRecord;
-import org.apache.hcatalog.data.HCatRecord;
-import org.apache.hcatalog.data.schema.HCatFieldSchema;
import org.apache.hcatalog.data.schema.HCatSchema;
-import org.apache.hcatalog.data.schema.HCatFieldSchema.Type;
import org.apache.hcatalog.mapreduce.HCatOutputCommitter;
import org.apache.hcatalog.mapreduce.HCatOutputFormat;
import org.apache.hcatalog.mapreduce.HCatTableInfo;
import org.apache.pig.PigException;
import org.apache.pig.ResourceSchema;
-import org.apache.pig.ResourceStatistics;
-import org.apache.pig.StoreFunc;
-import org.apache.pig.StoreMetadata;
-import org.apache.pig.backend.BackendException;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.parser.ParseException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.UDFContext;
-import org.apache.pig.impl.util.Utils;
/**
* HowlStorer.
*
*/
-public class HCatStorer extends StoreFunc implements StoreMetadata {
+public class HCatStorer extends HCatBaseStorer {
/**
*
*/
- private static final String COMPUTED_OUTPUT_SCHEMA = "howl.output.schema";
- private final Map<String,String> partitions;
- private Schema pigSchema;
- private RecordWriter<WritableComparable<?>, HCatRecord> writer;
- private HCatSchema computedSchema;
- private static final String PIG_SCHEMA = "howl.pig.store.schema";
- private String sign;
public HCatStorer(String partSpecs, String schema) throws ParseException, FrontendException {
-
- partitions = new HashMap<String, String>();
- if(partSpecs != null && !partSpecs.trim().isEmpty()){
- String[] partKVPs = partSpecs.split(",");
- for(String partKVP : partKVPs){
- String[] partKV = partKVP.split("=");
- if(partKV.length == 2) {
- partitions.put(partKV[0].trim(), partKV[1].trim());
- } else {
- throw new FrontendException("Invalid partition column specification. "+partSpecs, PigHCatUtil.PIG_EXCEPTION_CODE);
- }
- }
- }
-
- if(schema != null) {
- pigSchema = Utils.getSchemaFromString(schema);
- }
-
+ super(partSpecs, schema);
}
public HCatStorer(String partSpecs) throws ParseException, FrontendException {
@@ -109,353 +63,11 @@ public class HCatStorer extends StoreFun
}
@Override
- public void checkSchema(ResourceSchema resourceSchema) throws IOException {
-
- /* Schema provided by user and the schema computed by Pig
- * at the time of calling store must match.
- */
- Schema runtimeSchema = Schema.getPigSchema(resourceSchema);
- if(pigSchema != null){
- if(! Schema.equals(runtimeSchema, pigSchema, false, true) ){
- throw new FrontendException("Schema provided in store statement doesn't match with the Schema" +
- "returned by Pig run-time. Schema provided in HowlStorer: "+pigSchema.toString()+ " Schema received from Pig runtime: "+runtimeSchema.toString(), PigHCatUtil.PIG_EXCEPTION_CODE);
- }
- } else {
- pigSchema = runtimeSchema;
- }
- UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[]{sign}).setProperty(PIG_SCHEMA,ObjectSerializer.serialize(pigSchema));
- }
-
- /** Constructs HCatSchema from pigSchema. Passed tableSchema is the existing
- * schema of the table in metastore.
- */
- private HCatSchema convertPigSchemaToHCatSchema(Schema pigSchema, HCatSchema tableSchema) throws FrontendException{
-
- List<HCatFieldSchema> fieldSchemas = new ArrayList<HCatFieldSchema>(pigSchema.size());
- for(FieldSchema fSchema : pigSchema.getFields()){
- byte type = fSchema.type;
- HCatFieldSchema howlFSchema;
-
- try {
-
- // Find out if we need to throw away the tuple or not.
- if(type == DataType.BAG && removeTupleFromBag(tableSchema, fSchema)){
- List<HCatFieldSchema> arrFields = new ArrayList<HCatFieldSchema>(1);
- arrFields.add(getHowlFSFromPigFS(fSchema.schema.getField(0).schema.getField(0)));
- howlFSchema = new HCatFieldSchema(fSchema.alias, Type.ARRAY, new HCatSchema(arrFields), null);
- }
- else{
- howlFSchema = getHowlFSFromPigFS(fSchema);
- }
- fieldSchemas.add(howlFSchema);
- } catch (HCatException he){
- throw new FrontendException(he.getMessage(),PigHCatUtil.PIG_EXCEPTION_CODE,he);
- }
- }
-
- return new HCatSchema(fieldSchemas);
- }
-
- private void validateUnNested(Schema innerSchema) throws FrontendException{
-
- for(FieldSchema innerField : innerSchema.getFields()){
- validateAlias(innerField.alias);
- if(DataType.isComplex(innerField.type)) {
- throw new FrontendException("Complex types cannot be nested. "+innerField, PigHCatUtil.PIG_EXCEPTION_CODE);
- }
- }
- }
-
- private boolean removeTupleFromBag(HCatSchema tableSchema, FieldSchema bagFieldSchema) throws HCatException{
-
- String colName = bagFieldSchema.alias;
- for(HCatFieldSchema field : tableSchema.getFields()){
- if(colName.equalsIgnoreCase(field.getName())){
- return (field.getArrayElementSchema().get(0).getType() == Type.STRUCT) ? false : true;
- }
- }
- // Column was not found in table schema. Its a new column
- List<FieldSchema> tupSchema = bagFieldSchema.schema.getFields();
- return (tupSchema.size() == 1 && tupSchema.get(0).schema == null) ? true : false;
- }
-
-
- private HCatFieldSchema getHowlFSFromPigFS(FieldSchema fSchema) throws FrontendException, HCatException{
-
- byte type = fSchema.type;
- switch(type){
-
- case DataType.CHARARRAY:
- case DataType.BIGCHARARRAY:
- return new HCatFieldSchema(fSchema.alias, Type.STRING, null);
-
- case DataType.INTEGER:
- return new HCatFieldSchema(fSchema.alias, Type.INT, null);
-
- case DataType.LONG:
- return new HCatFieldSchema(fSchema.alias, Type.BIGINT, null);
-
- case DataType.FLOAT:
- return new HCatFieldSchema(fSchema.alias, Type.FLOAT, null);
-
- case DataType.DOUBLE:
- return new HCatFieldSchema(fSchema.alias, Type.DOUBLE, null);
-
- case DataType.BAG:
- Schema bagSchema = fSchema.schema;
- List<HCatFieldSchema> arrFields = new ArrayList<HCatFieldSchema>(1);
- arrFields.add(getHowlFSFromPigFS(bagSchema.getField(0)));
- return new HCatFieldSchema(fSchema.alias, Type.ARRAY, new HCatSchema(arrFields), "");
-
- case DataType.TUPLE:
- List<String> fieldNames = new ArrayList<String>();
- List<HCatFieldSchema> howlFSs = new ArrayList<HCatFieldSchema>();
- for( FieldSchema fieldSchema : fSchema.schema.getFields()){
- fieldNames.add( fieldSchema.alias);
- howlFSs.add(getHowlFSFromPigFS(fieldSchema));
- }
- return new HCatFieldSchema(fSchema.alias, Type.STRUCT, new HCatSchema(howlFSs), "");
-
- case DataType.MAP:{
- // Pig's schema contain no type information about map's keys and
- // values. So, if its a new column assume <string,string> if its existing
- // return whatever is contained in the existing column.
- HCatFieldSchema mapField = getTableCol(fSchema.alias, howlTblSchema);
- HCatFieldSchema valFS;
- List<HCatFieldSchema> valFSList = new ArrayList<HCatFieldSchema>(1);
-
- if(mapField != null){
- Type mapValType = mapField.getMapValueSchema().get(0).getType();
-
- switch(mapValType){
- case STRING:
- case BIGINT:
- case INT:
- case FLOAT:
- case DOUBLE:
- valFS = new HCatFieldSchema(fSchema.alias, mapValType, null);
- break;
- default:
- throw new FrontendException("Only pig primitive types are supported as map value types.", PigHCatUtil.PIG_EXCEPTION_CODE);
- }
- valFSList.add(valFS);
- return new HCatFieldSchema(fSchema.alias,Type.MAP,Type.STRING, new HCatSchema(valFSList),"");
- }
-
- // Column not found in target table. Its a new column. Its schema is map<string,string>
- valFS = new HCatFieldSchema(fSchema.alias, Type.STRING, "");
- valFSList.add(valFS);
- return new HCatFieldSchema(fSchema.alias,Type.MAP,Type.STRING, new HCatSchema(valFSList),"");
- }
-
- default:
- throw new FrontendException("Unsupported type: "+type+" in Pig's schema", PigHCatUtil.PIG_EXCEPTION_CODE);
- }
- }
-
- @Override
public OutputFormat getOutputFormat() throws IOException {
return new HCatOutputFormat();
}
@Override
- public void prepareToWrite(RecordWriter writer) throws IOException {
- this.writer = writer;
- computedSchema = (HCatSchema)ObjectSerializer.deserialize(UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[]{sign}).getProperty(COMPUTED_OUTPUT_SCHEMA));
- }
-
- @Override
- public void putNext(Tuple tuple) throws IOException {
-
- List<Object> outgoing = new ArrayList<Object>(tuple.size());
-
- int i = 0;
- for(HCatFieldSchema fSchema : computedSchema.getFields()){
- outgoing.add(getJavaObj(tuple.get(i++), fSchema));
- }
- try {
- writer.write(null, new DefaultHCatRecord(outgoing));
- } catch (InterruptedException e) {
- throw new BackendException("Error while writing tuple: "+tuple, PigHCatUtil.PIG_EXCEPTION_CODE, e);
- }
- }
-
- private Object getJavaObj(Object pigObj, HCatFieldSchema howlFS) throws ExecException, HCatException{
-
- // The real work-horse. Spend time and energy in this method if there is
- // need to keep HowlStorer lean and go fast.
- Type type = howlFS.getType();
-
- switch(type){
-
- case STRUCT:
- // Unwrap the tuple.
- return ((Tuple)pigObj).getAll();
- // Tuple innerTup = (Tuple)pigObj;
- //
- // List<Object> innerList = new ArrayList<Object>(innerTup.size());
- // int i = 0;
- // for(HowlTypeInfo structFieldTypeInfo : typeInfo.getAllStructFieldTypeInfos()){
- // innerList.add(getJavaObj(innerTup.get(i++), structFieldTypeInfo));
- // }
- // return innerList;
- case ARRAY:
- // Unwrap the bag.
- DataBag pigBag = (DataBag)pigObj;
- HCatFieldSchema tupFS = howlFS.getArrayElementSchema().get(0);
- boolean needTuple = tupFS.getType() == Type.STRUCT;
- List<Object> bagContents = new ArrayList<Object>((int)pigBag.size());
- Iterator<Tuple> bagItr = pigBag.iterator();
-
- while(bagItr.hasNext()){
- // If there is only one element in tuple contained in bag, we throw away the tuple.
- bagContents.add(needTuple ? getJavaObj(bagItr.next(), tupFS) : bagItr.next().get(0));
-
- }
- return bagContents;
-
- // case MAP:
- // Map<String,DataByteArray> pigMap = (Map<String,DataByteArray>)pigObj;
- // Map<String,Long> typeMap = new HashMap<String, Long>();
- // for(Entry<String, DataByteArray> entry: pigMap.entrySet()){
- // typeMap.put(entry.getKey(), new Long(entry.getValue().toString()));
- // }
- // return typeMap;
- default:
- return pigObj;
- }
- }
-
- @Override
- public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException {
-
- // Need to necessarily override this method since default impl assumes HDFS
- // based location string.
- return location;
- }
-
- @Override
- public void setStoreFuncUDFContextSignature(String signature) {
- sign = signature;
- }
-
-
- private void doSchemaValidations(Schema pigSchema, HCatSchema tblSchema) throws FrontendException, HCatException{
-
- // Iterate through all the elements in Pig Schema and do validations as
- // dictated by semantics, consult HCatSchema of table when need be.
-
- for(FieldSchema pigField : pigSchema.getFields()){
- byte type = pigField.type;
- String alias = pigField.alias;
- validateAlias(alias);
- HCatFieldSchema howlField = getTableCol(alias, tblSchema);
-
- if(DataType.isComplex(type)){
- switch(type){
-
- case DataType.MAP:
- if(howlField != null){
- if(howlField.getMapKeyType() != Type.STRING){
- throw new FrontendException("Key Type of map must be String "+howlField, PigHCatUtil.PIG_EXCEPTION_CODE);
- }
- if(howlField.getMapValueSchema().get(0).isComplex()){
- throw new FrontendException("Value type of map cannot be complex" + howlField, PigHCatUtil.PIG_EXCEPTION_CODE);
- }
- }
- break;
-
- case DataType.BAG:
- // Only map is allowed as complex type in tuples inside bag.
- for(FieldSchema innerField : pigField.schema.getField(0).schema.getFields()){
- if(innerField.type == DataType.BAG || innerField.type == DataType.TUPLE) {
- throw new FrontendException("Complex types cannot be nested. "+innerField, PigHCatUtil.PIG_EXCEPTION_CODE);
- }
- validateAlias(innerField.alias);
- }
- if(howlField != null){
- // Do the same validation for HCatSchema.
- HCatFieldSchema arrayFieldScehma = howlField.getArrayElementSchema().get(0);
- Type hType = arrayFieldScehma.getType();
- if(hType == Type.STRUCT){
- for(HCatFieldSchema structFieldInBag : arrayFieldScehma.getStructSubSchema().getFields()){
- if(structFieldInBag.getType() == Type.STRUCT || structFieldInBag.getType() == Type.ARRAY){
- throw new FrontendException("Nested Complex types not allowed "+ howlField, PigHCatUtil.PIG_EXCEPTION_CODE);
- }
- }
- }
- if(hType == Type.MAP){
- if(arrayFieldScehma.getMapKeyType() != Type.STRING){
- throw new FrontendException("Key Type of map must be String "+howlField, PigHCatUtil.PIG_EXCEPTION_CODE);
- }
- if(arrayFieldScehma.getMapValueSchema().get(0).isComplex()){
- throw new FrontendException("Value type of map cannot be complex "+howlField, PigHCatUtil.PIG_EXCEPTION_CODE);
- }
- }
- if(hType == Type.ARRAY) {
- throw new FrontendException("Arrays cannot contain array within it. "+howlField, PigHCatUtil.PIG_EXCEPTION_CODE);
- }
- }
- break;
-
- case DataType.TUPLE:
- validateUnNested(pigField.schema);
- if(howlField != null){
- for(HCatFieldSchema structFieldSchema : howlField.getStructSubSchema().getFields()){
- if(structFieldSchema.isComplex()){
- throw new FrontendException("Nested Complex types are not allowed."+howlField, PigHCatUtil.PIG_EXCEPTION_CODE);
- }
- }
- }
- break;
-
- default:
- throw new FrontendException("Internal Error.", PigHCatUtil.PIG_EXCEPTION_CODE);
- }
- }
- }
-
- for(HCatFieldSchema howlField : tblSchema.getFields()){
-
- // We dont do type promotion/demotion.
- Type hType = howlField.getType();
- switch(hType){
- case SMALLINT:
- case TINYINT:
- case BOOLEAN:
- throw new FrontendException("Incompatible type found in howl table schema: "+howlField, PigHCatUtil.PIG_EXCEPTION_CODE);
- }
- }
- }
-
- private void validateAlias(String alias) throws FrontendException{
- if(alias == null) {
- throw new FrontendException("Column name for a field is not specified. Please provide the full schema as an argument to HCatStorer.", PigHCatUtil.PIG_EXCEPTION_CODE);
- }
- if(alias.matches(".*[A-Z]+.*")) {
- throw new FrontendException("Column names should all be in lowercase. Invalid name found: "+alias, PigHCatUtil.PIG_EXCEPTION_CODE);
- }
- }
-
- // Finds column by name in HCatSchema, if not found returns null.
- private HCatFieldSchema getTableCol(String alias, HCatSchema tblSchema){
-
- for(HCatFieldSchema howlField : tblSchema.getFields()){
- if(howlField.getName().equalsIgnoreCase(alias)){
- return howlField;
- }
- }
- // Its a new column
- return null;
- }
- HCatSchema howlTblSchema;
-
- @Override
- public void cleanupOnFailure(String location, Job job) throws IOException {
- // No-op.
- }
-
- @Override
public void setStoreLocation(String location, Job job) throws IOException {
Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[]{sign});
@@ -489,7 +101,7 @@ public class HCatStorer extends StoreFun
// information passed to HCatOutputFormat was not right
throw new PigException(he.getMessage(), PigHCatUtil.PIG_EXCEPTION_CODE, he);
}
- howlTblSchema = HCatOutputFormat.getTableSchema(job);
+ HCatSchema howlTblSchema = HCatOutputFormat.getTableSchema(job);
try{
doSchemaValidations(pigSchema, howlTblSchema);
} catch(HCatException he){
@@ -528,8 +140,4 @@ public class HCatStorer extends StoreFun
new HCatOutputCommitter(null).cleanupJob(job);
}
}
-
- @Override
- public void storeStatistics(ResourceStatistics stats, String arg1, Job job) throws IOException {
- }
}
Added: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatEximInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatEximInputFormat.java?rev=1099539&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatEximInputFormat.java (added)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatEximInputFormat.java Wed May 4 17:50:42 2011
@@ -0,0 +1,428 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.data.schema.HCatSchemaUtils;
+import org.apache.hcatalog.mapreduce.TestHCatEximInputFormat.TestImport.EmpDetails;
+
+/**
+ *
+ * TestHCatEximInputFormat. tests primarily HCatEximInputFormat but
+ * also HCatEximOutputFormat.
+ *
+ */
+public class TestHCatEximInputFormat extends TestCase {
+
+ public static class TestExport extends
+ org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, LongWritable, HCatRecord> {
+
+ private HCatSchema recordSchema;
+
+ @Override
+ protected void setup(Context context) throws IOException,
+ InterruptedException {
+ super.setup(context);
+ recordSchema = HCatEximOutputFormat.getTableSchema(context);
+ }
+
+ @Override
+ public void map(LongWritable key, Text value, Context context)
+ throws IOException, InterruptedException {
+ String[] cols = value.toString().split(",");
+ HCatRecord record = new DefaultHCatRecord(recordSchema.size());
+ record.setInteger("emp_id", recordSchema, Integer.parseInt(cols[0]));
+ record.setString("emp_name", recordSchema, cols[1]);
+ record.setString("emp_dob", recordSchema, cols[2]);
+ record.setString("emp_sex", recordSchema, cols[3]);
+ context.write(key, record);
+ }
+ }
+
+ public static class TestImport extends
+ org.apache.hadoop.mapreduce.Mapper<
+ org.apache.hadoop.io.LongWritable, HCatRecord,
+ org.apache.hadoop.io.Text,
+ org.apache.hadoop.io.Text> {
+
+ private HCatSchema recordSchema;
+
+ public static class EmpDetails {
+ public String emp_name;
+ public String emp_dob;
+ public String emp_sex;
+ public String emp_country;
+ public String emp_state;
+ }
+
+ public static Map<Integer, EmpDetails> empRecords = new TreeMap<Integer, EmpDetails>();
+
+ @Override
+ protected void setup(Context context) throws IOException,
+ InterruptedException {
+ super.setup(context);
+ try {
+ recordSchema = HCatBaseInputFormat.getOutputSchema(context);
+ } catch (Exception e) {
+ throw new IOException("Error getting outputschema from job configuration", e);
+ }
+ System.out.println("RecordSchema : " + recordSchema.toString());
+ }
+
+ @Override
+ public void map(LongWritable key, HCatRecord value, Context context)
+ throws IOException, InterruptedException {
+ EmpDetails empDetails = new EmpDetails();
+ Integer emp_id = value.getInteger("emp_id", recordSchema);
+ String emp_name = value.getString("emp_name", recordSchema);
+ empDetails.emp_name = emp_name;
+ if (recordSchema.getPosition("emp_dob") != null) {
+ empDetails.emp_dob = value.getString("emp_dob", recordSchema);
+ }
+ if (recordSchema.getPosition("emp_sex") != null) {
+ empDetails.emp_sex = value.getString("emp_sex", recordSchema);
+ }
+ if (recordSchema.getPosition("emp_country") != null) {
+ empDetails.emp_country = value.getString("emp_country", recordSchema);
+ }
+ if (recordSchema.getPosition("emp_state") != null) {
+ empDetails.emp_state = value.getString("emp_state", recordSchema);
+ }
+ empRecords.put(emp_id, empDetails);
+ }
+ }
+
+ private static final String dbName = "hcatEximOutputFormatTestDB";
+ private static final String tblName = "hcatEximOutputFormatTestTable";
+ Configuration conf;
+ Job job;
+ List<HCatFieldSchema> columns;
+ HCatSchema schema;
+ FileSystem fs;
+ Path inputLocation;
+ Path outputLocation;
+ private HCatSchema partSchema;
+
+
+ @Override
+ protected void setUp() throws Exception {
+ System.out.println("Setup started");
+ super.setUp();
+ conf = new Configuration();
+ job = new Job(conf, "test eximinputformat");
+ columns = new ArrayList<HCatFieldSchema>();
+ columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_id",
+ Constants.INT_TYPE_NAME, "")));
+ columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_name",
+ Constants.STRING_TYPE_NAME, "")));
+ columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_dob",
+ Constants.STRING_TYPE_NAME, "")));
+ columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_sex",
+ Constants.STRING_TYPE_NAME, "")));
+ schema = new HCatSchema(columns);
+
+ fs = new LocalFileSystem();
+ fs.initialize(fs.getWorkingDirectory().toUri(), new Configuration());
+ inputLocation = new Path(fs.getWorkingDirectory(), "tmp/exports");
+ outputLocation = new Path(fs.getWorkingDirectory(), "tmp/data");
+
+ job.setJarByClass(this.getClass());
+ job.setNumReduceTasks(0);
+ System.out.println("Setup done");
+ }
+
+ private void setupMRExport(String[] records) throws IOException {
+ if (fs.exists(outputLocation)) {
+ fs.delete(outputLocation, true);
+ }
+ FSDataOutputStream ds = fs.create(outputLocation, true);
+ for (String record : records) {
+ ds.writeBytes(record);
+ }
+ ds.close();
+ job.setInputFormatClass(TextInputFormat.class);
+ job.setOutputFormatClass(HCatEximOutputFormat.class);
+ TextInputFormat.setInputPaths(job, outputLocation);
+ job.setMapperClass(TestExport.class);
+ }
+
+ private void setupMRImport() throws IOException {
+ if (fs.exists(outputLocation)) {
+ fs.delete(outputLocation, true);
+ }
+ job.setInputFormatClass(HCatEximInputFormat.class);
+ job.setOutputFormatClass(TextOutputFormat.class);
+ TextOutputFormat.setOutputPath(job, outputLocation);
+ job.setMapperClass(TestImport.class);
+ TestImport.empRecords.clear();
+ }
+
+
+ @Override
+ protected void tearDown() throws Exception {
+ System.out.println("Teardown started");
+ super.tearDown();
+ // fs.delete(inputLocation, true);
+ // fs.delete(outputLocation, true);
+ System.out.println("Teardown done");
+ }
+
+
+ private void runNonPartExport() throws IOException, InterruptedException, ClassNotFoundException {
+ if (fs.exists(inputLocation)) {
+ fs.delete(inputLocation, true);
+ }
+ setupMRExport(new String[] {
+ "237,Krishna,01/01/1990,M,IN,TN\n",
+ "238,Kalpana,01/01/2000,F,IN,KA\n",
+ "239,Satya,01/01/2001,M,US,TN\n",
+ "240,Kavya,01/01/2002,F,US,KA\n"
+
+ });
+ HCatEximOutputFormat.setOutput(
+ job,
+ dbName,
+ tblName,
+ inputLocation.toString(),
+ null,
+ null,
+ schema);
+
+ job.waitForCompletion(true);
+ HCatEximOutputCommitter committer = new HCatEximOutputCommitter(null);
+ committer.cleanupJob(job);
+ }
+
+ private void runPartExport(String record, String country, String state) throws IOException, InterruptedException, ClassNotFoundException {
+ setupMRExport(new String[] {record});
+ List<String> partValues = new ArrayList<String>(2);
+ partValues.add(country);
+ partValues.add(state);
+ HCatEximOutputFormat.setOutput(
+ job,
+ dbName,
+ tblName,
+ inputLocation.toString(),
+ partSchema ,
+ partValues ,
+ schema);
+
+ job.waitForCompletion(true);
+ HCatEximOutputCommitter committer = new HCatEximOutputCommitter(null);
+ committer.cleanupJob(job);
+ }
+
+ public void testNonPart() throws Exception {
+ try {
+ runNonPartExport();
+ setUp();
+ setupMRImport();
+ HCatEximInputFormat.setInput(job, "tmp/exports", null);
+ job.waitForCompletion(true);
+
+ assertEquals(4, TestImport.empRecords.size());
+ assertEmpDetail(TestImport.empRecords.get(237), "Krishna", "01/01/1990", "M", null, null);
+ assertEmpDetail(TestImport.empRecords.get(238), "Kalpana", "01/01/2000", "F", null, null);
+ assertEmpDetail(TestImport.empRecords.get(239), "Satya", "01/01/2001", "M", null, null);
+ assertEmpDetail(TestImport.empRecords.get(240), "Kavya", "01/01/2002", "F", null, null);
+ } catch (Exception e) {
+ System.out.println("Test failed with " + e.getMessage());
+ e.printStackTrace();
+ throw e;
+ }
+ }
+
+ public void testNonPartProjection() throws Exception {
+ try {
+
+ runNonPartExport();
+ setUp();
+ setupMRImport();
+ HCatEximInputFormat.setInput(job, "tmp/exports", null);
+
+ List<HCatFieldSchema> readColumns = new ArrayList<HCatFieldSchema>();
+ readColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_id",
+ Constants.INT_TYPE_NAME, "")));
+ readColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_name",
+ Constants.STRING_TYPE_NAME, "")));
+
+ HCatEximInputFormat.setOutputSchema(job, new HCatSchema(readColumns));
+ job.waitForCompletion(true);
+
+ assertEquals(4, TestImport.empRecords.size());
+ assertEmpDetail(TestImport.empRecords.get(237), "Krishna", null, null, null, null);
+ assertEmpDetail(TestImport.empRecords.get(238), "Kalpana", null, null, null, null);
+ assertEmpDetail(TestImport.empRecords.get(239), "Satya", null, null, null, null);
+ assertEmpDetail(TestImport.empRecords.get(240), "Kavya", null, null, null, null);
+ } catch (Exception e) {
+ System.out.println("Test failed with " + e.getMessage());
+ e.printStackTrace();
+ throw e;
+ }
+ }
+
+ public void testPart() throws Exception {
+ try {
+ if (fs.exists(inputLocation)) {
+ fs.delete(inputLocation, true);
+ }
+
+ List<HCatFieldSchema> partKeys = new ArrayList<HCatFieldSchema>(2);
+ partKeys.add(new HCatFieldSchema("emp_country", HCatFieldSchema.Type.STRING, ""));
+ partKeys.add(new HCatFieldSchema("emp_state", HCatFieldSchema.Type.STRING, ""));
+ partSchema = new HCatSchema(partKeys);
+
+ runPartExport("237,Krishna,01/01/1990,M,IN,TN", "in", "tn");
+ setUp();
+ runPartExport("238,Kalpana,01/01/2000,F,IN,KA\n", "in", "ka");
+ setUp();
+ runPartExport("239,Satya,01/01/2001,M,US,TN\n", "us", "tn");
+ setUp();
+ runPartExport("240,Kavya,01/01/2002,F,US,KA\n", "us", "ka");
+
+ setUp();
+ setupMRImport();
+ HCatEximInputFormat.setInput(job, "tmp/exports", null);
+ job.waitForCompletion(true);
+
+ assertEquals(4, TestImport.empRecords.size());
+ assertEmpDetail(TestImport.empRecords.get(237), "Krishna", "01/01/1990", "M", "in", "tn");
+ assertEmpDetail(TestImport.empRecords.get(238), "Kalpana", "01/01/2000", "F", "in", "ka");
+ assertEmpDetail(TestImport.empRecords.get(239), "Satya", "01/01/2001", "M", "us", "tn");
+ assertEmpDetail(TestImport.empRecords.get(240), "Kavya", "01/01/2002", "F", "us", "ka");
+ } catch (Exception e) {
+ System.out.println("Test failed with " + e.getMessage());
+ e.printStackTrace();
+ throw e;
+ }
+ }
+
+ public void testPartWithPartCols() throws Exception {
+ try {
+ if (fs.exists(inputLocation)) {
+ fs.delete(inputLocation, true);
+ }
+
+ List<HCatFieldSchema> partKeys = new ArrayList<HCatFieldSchema>(2);
+ partKeys.add(new HCatFieldSchema("emp_country", HCatFieldSchema.Type.STRING, ""));
+ partKeys.add(new HCatFieldSchema("emp_state", HCatFieldSchema.Type.STRING, ""));
+ partSchema = new HCatSchema(partKeys);
+
+ runPartExport("237,Krishna,01/01/1990,M,IN,TN", "in", "tn");
+ setUp();
+ runPartExport("238,Kalpana,01/01/2000,F,IN,KA\n", "in", "ka");
+ setUp();
+ runPartExport("239,Satya,01/01/2001,M,US,TN\n", "us", "tn");
+ setUp();
+ runPartExport("240,Kavya,01/01/2002,F,US,KA\n", "us", "ka");
+
+ setUp();
+ setupMRImport();
+ HCatEximInputFormat.setInput(job, "tmp/exports", null);
+
+ List<HCatFieldSchema> colsPlusPartKeys = new ArrayList<HCatFieldSchema>();
+ colsPlusPartKeys.addAll(columns);
+ colsPlusPartKeys.addAll(partKeys);
+
+ HCatBaseInputFormat.setOutputSchema(job, new HCatSchema(colsPlusPartKeys));
+ job.waitForCompletion(true);
+
+ assertEquals(4, TestImport.empRecords.size());
+ assertEmpDetail(TestImport.empRecords.get(237), "Krishna", "01/01/1990", "M", "in", "tn");
+ assertEmpDetail(TestImport.empRecords.get(238), "Kalpana", "01/01/2000", "F", "in", "ka");
+ assertEmpDetail(TestImport.empRecords.get(239), "Satya", "01/01/2001", "M", "us", "tn");
+ assertEmpDetail(TestImport.empRecords.get(240), "Kavya", "01/01/2002", "F", "us", "ka");
+ } catch (Exception e) {
+ System.out.println("Test failed with " + e.getMessage());
+ e.printStackTrace();
+ throw e;
+ }
+ }
+
+
+ public void testPartSelection() throws Exception {
+ try {
+ if (fs.exists(inputLocation)) {
+ fs.delete(inputLocation, true);
+ }
+
+ List<HCatFieldSchema> partKeys = new ArrayList<HCatFieldSchema>(2);
+ partKeys.add(new HCatFieldSchema("emp_country", HCatFieldSchema.Type.STRING, ""));
+ partKeys.add(new HCatFieldSchema("emp_state", HCatFieldSchema.Type.STRING, ""));
+ partSchema = new HCatSchema(partKeys);
+
+ runPartExport("237,Krishna,01/01/1990,M,IN,TN", "in", "tn");
+ setUp();
+ runPartExport("238,Kalpana,01/01/2000,F,IN,KA\n", "in", "ka");
+ setUp();
+ runPartExport("239,Satya,01/01/2001,M,US,TN\n", "us", "tn");
+ setUp();
+ runPartExport("240,Kavya,01/01/2002,F,US,KA\n", "us", "ka");
+
+ setUp();
+ setupMRImport();
+ Map<String, String> filter = new TreeMap<String, String>();
+ filter.put("emp_state", "ka");
+ HCatEximInputFormat.setInput(job, "tmp/exports", filter);
+ job.waitForCompletion(true);
+
+ assertEquals(2, TestImport.empRecords.size());
+ assertEmpDetail(TestImport.empRecords.get(238), "Kalpana", "01/01/2000", "F", "in", "ka");
+ assertEmpDetail(TestImport.empRecords.get(240), "Kavya", "01/01/2002", "F", "us", "ka");
+ } catch (Exception e) {
+ System.out.println("Test failed with " + e.getMessage());
+ e.printStackTrace();
+ throw e;
+ }
+ }
+
+
+ private void assertEmpDetail(EmpDetails empDetails, String name, String dob, String mf, String country, String state) {
+ assertNotNull(empDetails);
+ assertEquals(name, empDetails.emp_name);
+ assertEquals(dob, empDetails.emp_dob);
+ assertEquals(mf, empDetails.emp_sex);
+ assertEquals(country, empDetails.emp_country);
+ assertEquals(state, empDetails.emp_state);
+ }
+
+}
Added: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatEximOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatEximOutputFormat.java?rev=1099539&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatEximOutputFormat.java (added)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatEximOutputFormat.java Wed May 4 17:50:42 2011
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.data.schema.HCatSchemaUtils;
+
+/**
+ *
+ * TestHCatEximOutputFormat. Some basic testing here. More testing done via
+ * TestHCatEximInputFormat
+ *
+ */
+public class TestHCatEximOutputFormat extends TestCase {
+
+ public static class TestMap extends
+ Mapper<LongWritable, Text, LongWritable, HCatRecord> {
+
+ private HCatSchema recordSchema;
+
+ @Override
+ protected void setup(Context context) throws IOException,
+ InterruptedException {
+ super.setup(context);
+ recordSchema = HCatEximOutputFormat.getTableSchema(context);
+ System.out.println("TestMap/setup called");
+ }
+
+ @Override
+ public void map(LongWritable key, Text value, Context context)
+ throws IOException, InterruptedException {
+ String[] cols = value.toString().split(",");
+ HCatRecord record = new DefaultHCatRecord(recordSchema.size());
+ System.out.println("TestMap/map called. Cols[0]:" + cols[0]);
+ System.out.println("TestMap/map called. Cols[1]:" + cols[1]);
+ System.out.println("TestMap/map called. Cols[2]:" + cols[2]);
+ System.out.println("TestMap/map called. Cols[3]:" + cols[3]);
+ record.setInteger("emp_id", recordSchema, Integer.parseInt(cols[0]));
+ record.setString("emp_name", recordSchema, cols[1]);
+ record.setString("emp_dob", recordSchema, cols[2]);
+ record.setString("emp_sex", recordSchema, cols[3]);
+ context.write(key, record);
+ }
+ }
+
+
+ private static final String dbName = "hcatEximOutputFormatTestDB";
+ private static final String tblName = "hcatEximOutputFormatTestTable";
+ Configuration conf;
+ Job job;
+ List<HCatFieldSchema> columns;
+ HCatSchema schema;
+ FileSystem fs;
+ Path outputLocation;
+ Path dataLocation;
+
+ public void testNonPart() throws Exception {
+ try {
+ HCatEximOutputFormat.setOutput(
+ job,
+ dbName,
+ tblName,
+ outputLocation.toString(),
+ null,
+ null,
+ schema);
+
+ job.waitForCompletion(true);
+ HCatEximOutputCommitter committer = new HCatEximOutputCommitter(null);
+ committer.cleanupJob(job);
+
+ Path metadataPath = new Path(outputLocation, "_metadata");
+ Map.Entry<Table, List<Partition>> rv = EximUtil.readMetaData(fs, metadataPath);
+ Table table = rv.getKey();
+ List<Partition> partitions = rv.getValue();
+
+ assertEquals(dbName, table.getDbName());
+ assertEquals(tblName, table.getTableName());
+ assertTrue(EximUtil.schemaCompare(table.getSd().getCols(),
+ HCatUtil.getFieldSchemaList(columns)));
+ assertEquals("org.apache.hcatalog.rcfile.RCFileInputDriver",
+ table.getParameters().get(HCatConstants.HCAT_ISD_CLASS));
+ assertEquals("org.apache.hcatalog.rcfile.RCFileOutputDriver",
+ table.getParameters().get(HCatConstants.HCAT_OSD_CLASS));
+ assertEquals("org.apache.hadoop.hive.ql.io.RCFileInputFormat",
+ table.getSd().getInputFormat());
+ assertEquals("org.apache.hadoop.hive.ql.io.RCFileOutputFormat",
+ table.getSd().getOutputFormat());
+ assertEquals("org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe",
+ table.getSd().getSerdeInfo().getSerializationLib());
+ assertEquals(0, table.getPartitionKeys().size());
+
+ assertEquals(0, partitions.size());
+ } catch (Exception e) {
+ System.out.println("Test failed with " + e.getMessage());
+ e.printStackTrace();
+ throw e;
+ }
+
+ }
+
+ public void testPart() throws Exception {
+ try {
+ List<HCatFieldSchema> partKeys = new ArrayList<HCatFieldSchema>();
+ partKeys.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_country",
+ Constants.STRING_TYPE_NAME, "")));
+ partKeys.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_state",
+ Constants.STRING_TYPE_NAME, "")));
+ HCatSchema partitionSchema = new HCatSchema(partKeys);
+
+ List<String> partitionVals = new ArrayList<String>();
+ partitionVals.add("IN");
+ partitionVals.add("TN");
+
+ HCatEximOutputFormat.setOutput(
+ job,
+ dbName,
+ tblName,
+ outputLocation.toString(),
+ partitionSchema,
+ partitionVals,
+ schema);
+
+ job.waitForCompletion(true);
+ HCatEximOutputCommitter committer = new HCatEximOutputCommitter(null);
+ committer.cleanupJob(job);
+ Path metadataPath = new Path(outputLocation, "_metadata");
+ Map.Entry<Table, List<Partition>> rv = EximUtil.readMetaData(fs, metadataPath);
+ Table table = rv.getKey();
+ List<Partition> partitions = rv.getValue();
+
+ assertEquals(dbName, table.getDbName());
+ assertEquals(tblName, table.getTableName());
+ assertTrue(EximUtil.schemaCompare(table.getSd().getCols(),
+ HCatUtil.getFieldSchemaList(columns)));
+ assertEquals("org.apache.hcatalog.rcfile.RCFileInputDriver",
+ table.getParameters().get(HCatConstants.HCAT_ISD_CLASS));
+ assertEquals("org.apache.hcatalog.rcfile.RCFileOutputDriver",
+ table.getParameters().get(HCatConstants.HCAT_OSD_CLASS));
+ assertEquals("org.apache.hadoop.hive.ql.io.RCFileInputFormat",
+ table.getSd().getInputFormat());
+ assertEquals("org.apache.hadoop.hive.ql.io.RCFileOutputFormat",
+ table.getSd().getOutputFormat());
+ assertEquals("org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe",
+ table.getSd().getSerdeInfo().getSerializationLib());
+ assertEquals(2, table.getPartitionKeys().size());
+ List<FieldSchema> partSchema = table.getPartitionKeys();
+ assertEquals("emp_country", partSchema.get(0).getName());
+ assertEquals("emp_state", partSchema.get(1).getName());
+
+ assertEquals(1, partitions.size());
+ Partition partition = partitions.get(0);
+ assertEquals("IN", partition.getValues().get(0));
+ assertEquals("TN", partition.getValues().get(1));
+ assertEquals("org.apache.hcatalog.rcfile.RCFileInputDriver",
+ partition.getParameters().get(HCatConstants.HCAT_ISD_CLASS));
+ assertEquals("org.apache.hcatalog.rcfile.RCFileOutputDriver",
+ partition.getParameters().get(HCatConstants.HCAT_OSD_CLASS));
+ } catch (Exception e) {
+ System.out.println("Test failed with " + e.getMessage());
+ e.printStackTrace();
+ throw e;
+ }
+ }
+
+ @Override
+ protected void setUp() throws Exception {
+ System.out.println("Setup started");
+ super.setUp();
+ conf = new Configuration();
+ job = new Job(conf, "test eximoutputformat");
+ columns = new ArrayList<HCatFieldSchema>();
+ columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_id",
+ Constants.INT_TYPE_NAME, "")));
+ columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_name",
+ Constants.STRING_TYPE_NAME, "")));
+ columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_dob",
+ Constants.STRING_TYPE_NAME, "")));
+ columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_sex",
+ Constants.STRING_TYPE_NAME, "")));
+ schema = new HCatSchema(columns);
+
+ fs = new LocalFileSystem();
+ fs.initialize(fs.getWorkingDirectory().toUri(), new Configuration());
+ outputLocation = new Path(fs.getWorkingDirectory(), "tmp/exports");
+ if (fs.exists(outputLocation)) {
+ fs.delete(outputLocation, true);
+ }
+ dataLocation = new Path(fs.getWorkingDirectory(), "tmp/data");
+ if (fs.exists(dataLocation)) {
+ fs.delete(dataLocation, true);
+ }
+ FSDataOutputStream ds = fs.create(dataLocation, true);
+ ds.writeBytes("237,Krishna,01/01/1990,M,IN,TN\n");
+ ds.writeBytes("238,Kalpana,01/01/2000,F,IN,KA\n");
+ ds.writeBytes("239,Satya,01/01/2001,M,US,TN\n");
+ ds.writeBytes("240,Kavya,01/01/2002,F,US,KA\n");
+ ds.close();
+
+ job.setInputFormatClass(TextInputFormat.class);
+ job.setOutputFormatClass(HCatEximOutputFormat.class);
+ TextInputFormat.setInputPaths(job, dataLocation);
+ job.setJarByClass(this.getClass());
+ job.setMapperClass(TestMap.class);
+ job.setNumReduceTasks(0);
+ System.out.println("Setup done");
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ System.out.println("Teardown started");
+ super.tearDown();
+ fs.delete(dataLocation, true);
+ fs.delete(outputLocation, true);
+ System.out.println("Teardown done");
+ }
+}
Added: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatEximLoader.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatEximLoader.java?rev=1099539&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatEximLoader.java (added)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatEximLoader.java Wed May 4 17:50:42 2011
@@ -0,0 +1,351 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.pig;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+import java.util.TreeMap;
+
+import junit.framework.TestCase;
+
+import org.apache.hcatalog.MiniCluster;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.util.UDFContext;
+
+/**
+ *
+ * TestHCatEximLoader. Assumes Exim storer is working well
+ *
+ */
+public class TestHCatEximLoader extends TestCase {
+
+ private static final String NONPART_TABLE = "junit_unparted";
+ private static final String PARTITIONED_TABLE = "junit_parted";
+ private static MiniCluster cluster = MiniCluster.buildCluster();
+
+ private static final String dataLocation = "/tmp/data";
+ private static String fqdataLocation;
+ private static final String exportLocation = "/tmp/export";
+ private static String fqexportLocation;
+
+ private static Properties props;
+
+ private void cleanup() throws IOException {
+ MiniCluster.deleteFile(cluster, dataLocation);
+ MiniCluster.deleteFile(cluster, exportLocation);
+ }
+
+ @Override
+ protected void setUp() throws Exception {
+ props = new Properties();
+ props.setProperty("fs.default.name", cluster.getProperties().getProperty("fs.default.name"));
+ System.out.println("Filesystem class : " + cluster.getFileSystem().getClass().getName()
+ + ", fs.default.name : " + props.getProperty("fs.default.name"));
+ fqdataLocation = cluster.getProperties().getProperty("fs.default.name") + dataLocation;
+ fqexportLocation = cluster.getProperties().getProperty("fs.default.name") + exportLocation;
+ System.out.println("FQ Data Location :" + fqdataLocation);
+ System.out.println("FQ Export Location :" + fqexportLocation);
+ cleanup();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ cleanup();
+ }
+
+ private void populateDataFile() throws IOException {
+ MiniCluster.deleteFile(cluster, dataLocation);
+ String[] input = new String[] {
+ "237,Krishna,01/01/1990,M,IN,TN",
+ "238,Kalpana,01/01/2000,F,IN,KA",
+ "239,Satya,01/01/2001,M,US,TN",
+ "240,Kavya,01/01/2002,F,US,KA"
+ };
+ MiniCluster.createInputFile(cluster, dataLocation, input);
+ }
+
+ private static class EmpDetail {
+ String name;
+ String dob;
+ String mf;
+ String country;
+ String state;
+ }
+
+ private void assertEmpDetail(Tuple t, Map<Integer, EmpDetail> eds) throws ExecException {
+ assertNotNull(t);
+ assertEquals(6, t.size());
+
+ assertTrue(t.get(0).getClass() == Integer.class);
+ assertTrue(t.get(1).getClass() == String.class);
+ assertTrue(t.get(2).getClass() == String.class);
+ assertTrue(t.get(3).getClass() == String.class);
+ assertTrue(t.get(4).getClass() == String.class);
+ assertTrue(t.get(5).getClass() == String.class);
+
+ EmpDetail ed = eds.remove(t.get(0));
+ assertNotNull(ed);
+
+ assertEquals(ed.name, t.get(1));
+ assertEquals(ed.dob, t.get(2));
+ assertEquals(ed.mf, t.get(3));
+ assertEquals(ed.country, t.get(4));
+ assertEquals(ed.state, t.get(5));
+ }
+
+ private void addEmpDetail(Map<Integer, EmpDetail> empDetails, int id, String name,
+ String dob, String mf, String country, String state) {
+ EmpDetail ed = new EmpDetail();
+ ed.name = name;
+ ed.dob = dob;
+ ed.mf = mf;
+ ed.country = country;
+ ed.state = state;
+ empDetails.put(id, ed);
+ }
+
+
+
+ private void assertEmpDetail(Tuple t, Integer id, String name, String dob, String mf)
+ throws ExecException {
+ assertNotNull(t);
+ assertEquals(4, t.size());
+ assertTrue(t.get(0).getClass() == Integer.class);
+ assertTrue(t.get(1).getClass() == String.class);
+ assertTrue(t.get(2).getClass() == String.class);
+ assertTrue(t.get(3).getClass() == String.class);
+
+ assertEquals(id, t.get(0));
+ assertEquals(name, t.get(1));
+ assertEquals(dob, t.get(2));
+ assertEquals(mf, t.get(3));
+ }
+
+ private void assertEmpDetail(Tuple t, String mf, String name)
+ throws ExecException {
+ assertNotNull(t);
+ assertEquals(2, t.size());
+ assertTrue(t.get(0).getClass() == String.class);
+ assertTrue(t.get(1).getClass() == String.class);
+
+ assertEquals(mf, t.get(0));
+ assertEquals(name, t.get(1));
+ }
+
+
+
+ public void testLoadNonPartTable() throws Exception {
+ populateDataFile();
+ {
+ PigServer server = new PigServer(ExecType.LOCAL, props);
+ UDFContext.getUDFContext().setClientSystemProps();
+ server.setBatchOn();
+ server
+ .registerQuery("A = load '"
+ + fqdataLocation
+ + "' using PigStorage(',') as (emp_id:int, emp_name:chararray, emp_dob:chararray, emp_sex:chararray);");
+ server.registerQuery("store A into '" + NONPART_TABLE
+ + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + "');");
+ server.executeBatch();
+ }
+ {
+ PigServer server = new PigServer(ExecType.LOCAL, props);
+ UDFContext.getUDFContext().setClientSystemProps();
+
+ server
+ .registerQuery("A = load '"
+ + fqexportLocation
+ + "' using org.apache.hcatalog.pig.HCatEximLoader();");
+ Iterator<Tuple> XIter = server.openIterator("A");
+ assertTrue(XIter.hasNext());
+ Tuple t = XIter.next();
+ assertEmpDetail(t, 237, "Krishna", "01/01/1990", "M");
+ assertTrue(XIter.hasNext());
+ t = XIter.next();
+ assertEmpDetail(t, 238, "Kalpana", "01/01/2000", "F");
+ assertTrue(XIter.hasNext());
+ t = XIter.next();
+ assertEmpDetail(t, 239, "Satya", "01/01/2001", "M");
+ assertTrue(XIter.hasNext());
+ t = XIter.next();
+ assertEmpDetail(t, 240, "Kavya", "01/01/2002", "F");
+ assertFalse(XIter.hasNext());
+ }
+ }
+
+ public void testLoadNonPartProjection() throws Exception {
+ populateDataFile();
+ {
+ PigServer server = new PigServer(ExecType.LOCAL, props);
+ UDFContext.getUDFContext().setClientSystemProps();
+ server.setBatchOn();
+ server
+ .registerQuery("A = load '"
+ + fqdataLocation
+ + "' using PigStorage(',') as (emp_id:int, emp_name:chararray, emp_dob:chararray, emp_sex:chararray);");
+ server.registerQuery("store A into '" + NONPART_TABLE
+ + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + "');");
+ server.executeBatch();
+ }
+ {
+ PigServer server = new PigServer(ExecType.LOCAL, props);
+ UDFContext.getUDFContext().setClientSystemProps();
+
+ server
+ .registerQuery("A = load '"
+ + fqexportLocation
+ + "' using org.apache.hcatalog.pig.HCatEximLoader();");
+ server.registerQuery("B = foreach A generate emp_sex, emp_name;");
+
+ Iterator<Tuple> XIter = server.openIterator("B");
+ assertTrue(XIter.hasNext());
+ Tuple t = XIter.next();
+ assertEmpDetail(t, "M", "Krishna");
+ assertTrue(XIter.hasNext());
+ t = XIter.next();
+ assertEmpDetail(t, "F", "Kalpana");
+ assertTrue(XIter.hasNext());
+ t = XIter.next();
+ assertEmpDetail(t, "M", "Satya");
+ assertTrue(XIter.hasNext());
+ t = XIter.next();
+ assertEmpDetail(t, "F", "Kavya");
+ assertFalse(XIter.hasNext());
+ }
+ }
+
+
+ public void testLoadMultiPartTable() throws Exception {
+ {
+ populateDataFile();
+ PigServer server = new PigServer(ExecType.LOCAL, props);
+ UDFContext.getUDFContext().setClientSystemProps();
+ server.setBatchOn();
+ server
+ .registerQuery("A = load '"
+ + fqdataLocation +
+ "' using PigStorage(',') as (emp_id:int, emp_name:chararray, emp_dob:chararray, emp_sex:chararray, emp_country:chararray, emp_state:chararray);"
+ );
+ server.registerQuery("INTN = FILTER A BY emp_country == 'IN' AND emp_state == 'TN';");
+ server.registerQuery("INKA = FILTER A BY emp_country == 'IN' AND emp_state == 'KA';");
+ server.registerQuery("USTN = FILTER A BY emp_country == 'US' AND emp_state == 'TN';");
+ server.registerQuery("USKA = FILTER A BY emp_country == 'US' AND emp_state == 'KA';");
+ server.registerQuery("store INTN into '" + PARTITIONED_TABLE
+ + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation +
+ "', 'emp_country=in,emp_state=tn');");
+ server.registerQuery("store INKA into '" + PARTITIONED_TABLE
+ + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation +
+ "', 'emp_country=in,emp_state=ka');");
+ server.registerQuery("store USTN into '" + PARTITIONED_TABLE
+ + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation +
+ "', 'emp_country=us,emp_state=tn');");
+ server.registerQuery("store USKA into '" + PARTITIONED_TABLE
+ + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation +
+ "', 'emp_country=us,emp_state=ka');");
+ server.executeBatch();
+ }
+ {
+ PigServer server = new PigServer(ExecType.LOCAL, props);
+ UDFContext.getUDFContext().setClientSystemProps();
+
+ server
+ .registerQuery("A = load '"
+ + fqexportLocation
+ + "' using org.apache.hcatalog.pig.HCatEximLoader() "
+ //+ "as (emp_id:int, emp_name:chararray, emp_dob:chararray, emp_sex:chararray, emp_country:chararray, emp_state:chararray);");
+ + ";");
+
+ Iterator<Tuple> XIter = server.openIterator("A");
+
+ Map<Integer, EmpDetail> empDetails = new TreeMap<Integer, EmpDetail>();
+ addEmpDetail(empDetails, 237, "Krishna", "01/01/1990", "M", "in", "tn");
+ addEmpDetail(empDetails, 238, "Kalpana", "01/01/2000", "F", "in", "ka");
+ addEmpDetail(empDetails, 239, "Satya", "01/01/2001", "M", "us", "tn");
+ addEmpDetail(empDetails, 240, "Kavya", "01/01/2002", "F", "us", "ka");
+
+ while(XIter.hasNext()) {
+ Tuple t = XIter.next();
+ assertNotSame(0, empDetails.size());
+ assertEmpDetail(t, empDetails);
+ }
+ assertEquals(0, empDetails.size());
+ }
+ }
+
+ public void testLoadMultiPartFilter() throws Exception {
+ {
+ populateDataFile();
+ PigServer server = new PigServer(ExecType.LOCAL, props);
+ UDFContext.getUDFContext().setClientSystemProps();
+ server.setBatchOn();
+ server
+ .registerQuery("A = load '"
+ + fqdataLocation +
+ "' using PigStorage(',') as (emp_id:int, emp_name:chararray, emp_dob:chararray, emp_sex:chararray, emp_country:chararray, emp_state:chararray);"
+ );
+ server.registerQuery("INTN = FILTER A BY emp_country == 'IN' AND emp_state == 'TN';");
+ server.registerQuery("INKA = FILTER A BY emp_country == 'IN' AND emp_state == 'KA';");
+ server.registerQuery("USTN = FILTER A BY emp_country == 'US' AND emp_state == 'TN';");
+ server.registerQuery("USKA = FILTER A BY emp_country == 'US' AND emp_state == 'KA';");
+ server.registerQuery("store INTN into '" + PARTITIONED_TABLE
+ + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation +
+ "', 'emp_country=in,emp_state=tn');");
+ server.registerQuery("store INKA into '" + PARTITIONED_TABLE
+ + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation +
+ "', 'emp_country=in,emp_state=ka');");
+ server.registerQuery("store USTN into '" + PARTITIONED_TABLE
+ + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation +
+ "', 'emp_country=us,emp_state=tn');");
+ server.registerQuery("store USKA into '" + PARTITIONED_TABLE
+ + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation +
+ "', 'emp_country=us,emp_state=ka');");
+ server.executeBatch();
+ }
+ {
+ PigServer server = new PigServer(ExecType.LOCAL, props);
+ UDFContext.getUDFContext().setClientSystemProps();
+
+ server
+ .registerQuery("A = load '"
+ + fqexportLocation
+ + "' using org.apache.hcatalog.pig.HCatEximLoader() "
+ + ";");
+ server.registerQuery("B = filter A by emp_state == 'ka';");
+
+ Iterator<Tuple> XIter = server.openIterator("B");
+
+ Map<Integer, EmpDetail> empDetails = new TreeMap<Integer, EmpDetail>();
+ addEmpDetail(empDetails, 238, "Kalpana", "01/01/2000", "F", "in", "ka");
+ addEmpDetail(empDetails, 240, "Kavya", "01/01/2002", "F", "us", "ka");
+
+ while(XIter.hasNext()) {
+ Tuple t = XIter.next();
+ assertNotSame(0, empDetails.size());
+ assertEmpDetail(t, empDetails);
+ }
+ assertEquals(0, empDetails.size());
+ }
+ }
+
+
+}