You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kh...@apache.org on 2013/09/06 02:49:17 UTC

svn commit: r1520466 [13/18] - in /hive/trunk/hcatalog: core/src/main/java/org/apache/hcatalog/cli/ core/src/main/java/org/apache/hcatalog/cli/SemanticAnalysis/ core/src/main/java/org/apache/hcatalog/common/ core/src/main/java/org/apache/hcatalog/data/...

Added: hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatBaseLoader.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatBaseLoader.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatBaseLoader.java (added)
+++ hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatBaseLoader.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hcatalog.pig;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.mapreduce.InputJobInfo;
+import org.apache.hcatalog.mapreduce.PartInfo;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.LoadMetadata;
+import org.apache.pig.LoadPushDown;
+import org.apache.pig.PigException;
+import org.apache.pig.ResourceStatistics;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.util.UDFContext;
+
+/**
+ * Base class for HCatLoader and HCatEximLoader
+ */
+
+abstract class HCatBaseLoader extends LoadFunc implements LoadMetadata, LoadPushDown {
+
+    protected static final String PRUNE_PROJECTION_INFO = "prune.projection.info";
+
+    private RecordReader<?, ?> reader;
+    protected String signature;
+
+    HCatSchema outputSchema = null;
+
+
+    @Override
+    public Tuple getNext() throws IOException {
+        try {
+            HCatRecord hr = (HCatRecord) (reader.nextKeyValue() ? reader.getCurrentValue() : null);
+            Tuple t = PigHCatUtil.transformToTuple(hr, outputSchema);
+            // TODO : we were discussing an iter interface, and also a LazyTuple
+            // change this when plans for that solidifies.
+            return t;
+        } catch (ExecException e) {
+            int errCode = 6018;
+            String errMsg = "Error while reading input";
+            throw new ExecException(errMsg, errCode,
+                PigException.REMOTE_ENVIRONMENT, e);
+        } catch (Exception eOther) {
+            int errCode = 6018;
+            String errMsg = "Error converting read value to tuple";
+            throw new ExecException(errMsg, errCode,
+                PigException.REMOTE_ENVIRONMENT, eOther);
+        }
+
+    }
+
+    @Override
+    public void prepareToRead(RecordReader reader, PigSplit arg1) throws IOException {
+        this.reader = reader;
+    }
+
+    @Override
+    public ResourceStatistics getStatistics(String location, Job job) throws IOException {
+        // statistics not implemented currently
+        return null;
+    }
+
+    @Override
+    public List<OperatorSet> getFeatures() {
+        return Arrays.asList(LoadPushDown.OperatorSet.PROJECTION);
+    }
+
+    @Override
+    public RequiredFieldResponse pushProjection(RequiredFieldList requiredFieldsInfo) throws FrontendException {
+        // Store the required fields information in the UDFContext so that we
+        // can retrieve it later.
+        storeInUDFContext(signature, PRUNE_PROJECTION_INFO, requiredFieldsInfo);
+
+        // HCat will always prune columns based on what we ask of it - so the
+        // response is true
+        return new RequiredFieldResponse(true);
+    }
+
+    @Override
+    public void setUDFContextSignature(String signature) {
+        this.signature = signature;
+    }
+
+
+    // helper methods
+    protected void storeInUDFContext(String signature, String key, Object value) {
+        UDFContext udfContext = UDFContext.getUDFContext();
+        Properties props = udfContext.getUDFProperties(
+            this.getClass(), new String[]{signature});
+        props.put(key, value);
+    }
+
+    /**
+     * A utility method to get the size of inputs. This is accomplished by summing the
+     * size of all input paths on supported FileSystems. Locations whose size cannot be
+     * determined are ignored. Note non-FileSystem and unpartitioned locations will not
+     * report their input size by default.
+     */
+    protected static long getSizeInBytes(InputJobInfo inputJobInfo) throws IOException {
+        Configuration conf = new Configuration();
+        long sizeInBytes = 0;
+
+        for (PartInfo partInfo : inputJobInfo.getPartitions()) {
+            try {
+                Path p = new Path(partInfo.getLocation());
+                if (p.getFileSystem(conf).isFile(p)) {
+                    sizeInBytes += p.getFileSystem(conf).getFileStatus(p).getLen();
+                } else {
+                    FileStatus[] fileStatuses = p.getFileSystem(conf).listStatus(p);
+                    if (fileStatuses != null) {
+                        for (FileStatus child : fileStatuses) {
+                            sizeInBytes += child.getLen();
+                        }
+                    }
+                }
+            } catch (IOException e) {
+                // Report size to the extent possible.
+            }
+        }
+
+        return sizeInBytes;
+    }
+}

Added: hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatBaseStorer.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatBaseStorer.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatBaseStorer.java (added)
+++ hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatBaseStorer.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,442 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.pig;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatFieldSchema.Type;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceStatistics;
+import org.apache.pig.StoreFunc;
+import org.apache.pig.StoreMetadata;
+import org.apache.pig.backend.BackendException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.impl.util.Utils;
+
+/**
+ * Base class for HCatStorer and HCatEximStorer
+ *
+ */
+
+abstract class HCatBaseStorer extends StoreFunc implements StoreMetadata {
+
+    private static final List<Type> SUPPORTED_INTEGER_CONVERSIONS =
+        Lists.newArrayList(Type.TINYINT, Type.SMALLINT, Type.INT);
+    protected static final String COMPUTED_OUTPUT_SCHEMA = "hcat.output.schema";
+    protected final List<String> partitionKeys;
+    protected final Map<String, String> partitions;
+    protected Schema pigSchema;
+    private RecordWriter<WritableComparable<?>, HCatRecord> writer;
+    protected HCatSchema computedSchema;
+    protected static final String PIG_SCHEMA = "hcat.pig.store.schema";
+    protected String sign;
+
+    public HCatBaseStorer(String partSpecs, String schema) throws Exception {
+
+        partitionKeys = new ArrayList<String>();
+        partitions = new HashMap<String, String>();
+        if (partSpecs != null && !partSpecs.trim().isEmpty()) {
+            String[] partKVPs = partSpecs.split(",");
+            for (String partKVP : partKVPs) {
+                String[] partKV = partKVP.split("=");
+                if (partKV.length == 2) {
+                    String partKey = partKV[0].trim();
+                    partitionKeys.add(partKey);
+                    partitions.put(partKey, partKV[1].trim());
+                } else {
+                    throw new FrontendException("Invalid partition column specification. " + partSpecs, PigHCatUtil.PIG_EXCEPTION_CODE);
+                }
+            }
+        }
+
+        if (schema != null) {
+            pigSchema = Utils.getSchemaFromString(schema);
+        }
+
+    }
+
+    @Override
+    public void checkSchema(ResourceSchema resourceSchema) throws IOException {
+
+        /*  Schema provided by user and the schema computed by Pig
+        * at the time of calling store must match.
+        */
+        Schema runtimeSchema = Schema.getPigSchema(resourceSchema);
+        if (pigSchema != null) {
+            if (!Schema.equals(runtimeSchema, pigSchema, false, true)) {
+                throw new FrontendException("Schema provided in store statement doesn't match with the Schema" +
+                    "returned by Pig run-time. Schema provided in HCatStorer: " + pigSchema.toString() + " Schema received from Pig runtime: " + runtimeSchema.toString(), PigHCatUtil.PIG_EXCEPTION_CODE);
+            }
+        } else {
+            pigSchema = runtimeSchema;
+        }
+        UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[]{sign}).setProperty(PIG_SCHEMA, ObjectSerializer.serialize(pigSchema));
+    }
+
+    /** Constructs HCatSchema from pigSchema. Passed tableSchema is the existing
+     * schema of the table in metastore.
+     */
+    protected HCatSchema convertPigSchemaToHCatSchema(Schema pigSchema, HCatSchema tableSchema) throws FrontendException {
+        List<HCatFieldSchema> fieldSchemas = new ArrayList<HCatFieldSchema>(pigSchema.size());
+        for (FieldSchema fSchema : pigSchema.getFields()) {
+            try {
+                HCatFieldSchema hcatFieldSchema = getColFromSchema(fSchema.alias, tableSchema);
+
+                fieldSchemas.add(getHCatFSFromPigFS(fSchema, hcatFieldSchema));
+            } catch (HCatException he) {
+                throw new FrontendException(he.getMessage(), PigHCatUtil.PIG_EXCEPTION_CODE, he);
+            }
+        }
+        return new HCatSchema(fieldSchemas);
+    }
+
+    public static boolean removeTupleFromBag(HCatFieldSchema hcatFieldSchema, FieldSchema bagFieldSchema) throws HCatException {
+        if (hcatFieldSchema != null && hcatFieldSchema.getArrayElementSchema().get(0).getType() != Type.STRUCT) {
+            return true;
+        }
+        // Column was not found in table schema. Its a new column
+        List<FieldSchema> tupSchema = bagFieldSchema.schema.getFields();
+        if (hcatFieldSchema == null && tupSchema.size() == 1 && (tupSchema.get(0).schema == null || (tupSchema.get(0).type == DataType.TUPLE && tupSchema.get(0).schema.size() == 1))) {
+            return true;
+        }
+        return false;
+    }
+
+
+    private HCatFieldSchema getHCatFSFromPigFS(FieldSchema fSchema, HCatFieldSchema hcatFieldSchema) throws FrontendException, HCatException {
+        byte type = fSchema.type;
+        switch (type) {
+
+        case DataType.CHARARRAY:
+        case DataType.BIGCHARARRAY:
+            return new HCatFieldSchema(fSchema.alias, Type.STRING, null);
+
+        case DataType.INTEGER:
+            if (hcatFieldSchema != null) {
+                if (!SUPPORTED_INTEGER_CONVERSIONS.contains(hcatFieldSchema.getType())) {
+                    throw new FrontendException("Unsupported type: " + type + "  in Pig's schema",
+                        PigHCatUtil.PIG_EXCEPTION_CODE);
+                }
+                return new HCatFieldSchema(fSchema.alias, hcatFieldSchema.getType(), null);
+            } else {
+                return new HCatFieldSchema(fSchema.alias, Type.INT, null);
+            }
+
+        case DataType.LONG:
+            return new HCatFieldSchema(fSchema.alias, Type.BIGINT, null);
+
+        case DataType.FLOAT:
+            return new HCatFieldSchema(fSchema.alias, Type.FLOAT, null);
+
+        case DataType.DOUBLE:
+            return new HCatFieldSchema(fSchema.alias, Type.DOUBLE, null);
+
+        case DataType.BYTEARRAY:
+            return new HCatFieldSchema(fSchema.alias, Type.BINARY, null);
+
+        case DataType.BAG:
+            Schema bagSchema = fSchema.schema;
+            List<HCatFieldSchema> arrFields = new ArrayList<HCatFieldSchema>(1);
+            FieldSchema field;
+            // Find out if we need to throw away the tuple or not.
+            if (removeTupleFromBag(hcatFieldSchema, fSchema)) {
+                field = bagSchema.getField(0).schema.getField(0);
+            } else {
+                field = bagSchema.getField(0);
+            }
+            arrFields.add(getHCatFSFromPigFS(field, hcatFieldSchema == null ? null : hcatFieldSchema.getArrayElementSchema().get(0)));
+            return new HCatFieldSchema(fSchema.alias, Type.ARRAY, new HCatSchema(arrFields), "");
+
+        case DataType.TUPLE:
+            List<String> fieldNames = new ArrayList<String>();
+            List<HCatFieldSchema> hcatFSs = new ArrayList<HCatFieldSchema>();
+            HCatSchema structSubSchema = hcatFieldSchema == null ? null : hcatFieldSchema.getStructSubSchema();
+            List<FieldSchema> fields = fSchema.schema.getFields();
+            for (int i = 0; i < fields.size(); i++) {
+                FieldSchema fieldSchema = fields.get(i);
+                fieldNames.add(fieldSchema.alias);
+                hcatFSs.add(getHCatFSFromPigFS(fieldSchema, structSubSchema == null ? null : structSubSchema.get(i)));
+            }
+            return new HCatFieldSchema(fSchema.alias, Type.STRUCT, new HCatSchema(hcatFSs), "");
+
+        case DataType.MAP: {
+            // Pig's schema contain no type information about map's keys and
+            // values. So, if its a new column assume <string,string> if its existing
+            // return whatever is contained in the existing column.
+
+            HCatFieldSchema valFS;
+            List<HCatFieldSchema> valFSList = new ArrayList<HCatFieldSchema>(1);
+
+            if (hcatFieldSchema != null) {
+                return new HCatFieldSchema(fSchema.alias, Type.MAP, Type.STRING, hcatFieldSchema.getMapValueSchema(), "");
+            }
+
+            // Column not found in target table. Its a new column. Its schema is map<string,string>
+            valFS = new HCatFieldSchema(fSchema.alias, Type.STRING, "");
+            valFSList.add(valFS);
+            return new HCatFieldSchema(fSchema.alias, Type.MAP, Type.STRING, new HCatSchema(valFSList), "");
+        }
+
+        default:
+            throw new FrontendException("Unsupported type: " + type + "  in Pig's schema", PigHCatUtil.PIG_EXCEPTION_CODE);
+        }
+    }
+
+    @Override
+    public void prepareToWrite(RecordWriter writer) throws IOException {
+        this.writer = writer;
+        computedSchema = (HCatSchema) ObjectSerializer.deserialize(UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[]{sign}).getProperty(COMPUTED_OUTPUT_SCHEMA));
+    }
+
+    @Override
+    public void putNext(Tuple tuple) throws IOException {
+
+        List<Object> outgoing = new ArrayList<Object>(tuple.size());
+
+        int i = 0;
+        for (HCatFieldSchema fSchema : computedSchema.getFields()) {
+            outgoing.add(getJavaObj(tuple.get(i++), fSchema));
+        }
+        try {
+            writer.write(null, new DefaultHCatRecord(outgoing));
+        } catch (InterruptedException e) {
+            throw new BackendException("Error while writing tuple: " + tuple, PigHCatUtil.PIG_EXCEPTION_CODE, e);
+        }
+    }
+
+    private Object getJavaObj(Object pigObj, HCatFieldSchema hcatFS) throws HCatException, BackendException {
+        try {
+
+            // The real work-horse. Spend time and energy in this method if there is
+            // need to keep HCatStorer lean and go fast.
+            Type type = hcatFS.getType();
+            switch (type) {
+
+            case BINARY:
+                if (pigObj == null) {
+                    return null;
+                }
+                return ((DataByteArray) pigObj).get();
+
+            case STRUCT:
+                if (pigObj == null) {
+                    return null;
+                }
+                HCatSchema structSubSchema = hcatFS.getStructSubSchema();
+                // Unwrap the tuple.
+                List<Object> all = ((Tuple) pigObj).getAll();
+                ArrayList<Object> converted = new ArrayList<Object>(all.size());
+                for (int i = 0; i < all.size(); i++) {
+                    converted.add(getJavaObj(all.get(i), structSubSchema.get(i)));
+                }
+                return converted;
+
+            case ARRAY:
+                if (pigObj == null) {
+                    return null;
+                }
+                // Unwrap the bag.
+                DataBag pigBag = (DataBag) pigObj;
+                HCatFieldSchema tupFS = hcatFS.getArrayElementSchema().get(0);
+                boolean needTuple = tupFS.getType() == Type.STRUCT;
+                List<Object> bagContents = new ArrayList<Object>((int) pigBag.size());
+                Iterator<Tuple> bagItr = pigBag.iterator();
+
+                while (bagItr.hasNext()) {
+                    // If there is only one element in tuple contained in bag, we throw away the tuple.
+                    bagContents.add(getJavaObj(needTuple ? bagItr.next() : bagItr.next().get(0), tupFS));
+
+                }
+                return bagContents;
+            case MAP:
+                if (pigObj == null) {
+                    return null;
+                }
+                Map<?, ?> pigMap = (Map<?, ?>) pigObj;
+                Map<Object, Object> typeMap = new HashMap<Object, Object>();
+                for (Entry<?, ?> entry : pigMap.entrySet()) {
+                    // the value has a schema and not a FieldSchema
+                    typeMap.put(
+                        // Schema validation enforces that the Key is a String
+                        (String) entry.getKey(),
+                        getJavaObj(entry.getValue(), hcatFS.getMapValueSchema().get(0)));
+                }
+                return typeMap;
+            case STRING:
+            case INT:
+            case BIGINT:
+            case FLOAT:
+            case DOUBLE:
+                return pigObj;
+            case SMALLINT:
+                if (pigObj == null) {
+                    return null;
+                }
+                if ((Integer) pigObj < Short.MIN_VALUE || (Integer) pigObj > Short.MAX_VALUE) {
+                    throw new BackendException("Value " + pigObj + " is outside the bounds of column " +
+                        hcatFS.getName() + " with type " + hcatFS.getType(), PigHCatUtil.PIG_EXCEPTION_CODE);
+                }
+                return ((Integer) pigObj).shortValue();
+            case TINYINT:
+                if (pigObj == null) {
+                    return null;
+                }
+                if ((Integer) pigObj < Byte.MIN_VALUE || (Integer) pigObj > Byte.MAX_VALUE) {
+                    throw new BackendException("Value " + pigObj + " is outside the bounds of column " +
+                        hcatFS.getName() + " with type " + hcatFS.getType(), PigHCatUtil.PIG_EXCEPTION_CODE);
+                }
+                return ((Integer) pigObj).byteValue();
+            case BOOLEAN:
+                // would not pass schema validation anyway
+                throw new BackendException("Incompatible type " + type + " found in hcat table schema: " + hcatFS, PigHCatUtil.PIG_EXCEPTION_CODE);
+            default:
+                throw new BackendException("Unexpected type " + type + " for value " + pigObj + (pigObj == null ? "" : " of class " + pigObj.getClass().getName()), PigHCatUtil.PIG_EXCEPTION_CODE);
+            }
+        } catch (BackendException e) {
+            // provide the path to the field in the error message
+            throw new BackendException(
+                (hcatFS.getName() == null ? " " : hcatFS.getName() + ".") + e.getMessage(),
+                e.getCause() == null ? e : e.getCause());
+        }
+    }
+
+    @Override
+    public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException {
+
+        // Need to necessarily override this method since default impl assumes HDFS
+        // based location string.
+        return location;
+    }
+
+    @Override
+    public void setStoreFuncUDFContextSignature(String signature) {
+        sign = signature;
+    }
+
+
+    protected void doSchemaValidations(Schema pigSchema, HCatSchema tblSchema) throws FrontendException, HCatException {
+
+        // Iterate through all the elements in Pig Schema and do validations as
+        // dictated by semantics, consult HCatSchema of table when need be.
+
+        for (FieldSchema pigField : pigSchema.getFields()) {
+            HCatFieldSchema hcatField = getColFromSchema(pigField.alias, tblSchema);
+            validateSchema(pigField, hcatField);
+        }
+
+        try {
+            PigHCatUtil.validateHCatTableSchemaFollowsPigRules(tblSchema);
+        } catch (IOException e) {
+            throw new FrontendException("HCatalog schema is not compatible with Pig: " + e.getMessage(), PigHCatUtil.PIG_EXCEPTION_CODE, e);
+        }
+    }
+
+
+    private void validateSchema(FieldSchema pigField, HCatFieldSchema hcatField)
+        throws HCatException, FrontendException {
+        validateAlias(pigField.alias);
+        byte type = pigField.type;
+        if (DataType.isComplex(type)) {
+            switch (type) {
+
+            case DataType.MAP:
+                if (hcatField != null) {
+                    if (hcatField.getMapKeyType() != Type.STRING) {
+                        throw new FrontendException("Key Type of map must be String " + hcatField, PigHCatUtil.PIG_EXCEPTION_CODE);
+                    }
+                    // Map values can be primitive or complex
+                }
+                break;
+
+            case DataType.BAG:
+                HCatSchema arrayElementSchema = hcatField == null ? null : hcatField.getArrayElementSchema();
+                for (FieldSchema innerField : pigField.schema.getField(0).schema.getFields()) {
+                    validateSchema(innerField, getColFromSchema(pigField.alias, arrayElementSchema));
+                }
+                break;
+
+            case DataType.TUPLE:
+                HCatSchema structSubSchema = hcatField == null ? null : hcatField.getStructSubSchema();
+                for (FieldSchema innerField : pigField.schema.getFields()) {
+                    validateSchema(innerField, getColFromSchema(pigField.alias, structSubSchema));
+                }
+                break;
+
+            default:
+                throw new FrontendException("Internal Error.", PigHCatUtil.PIG_EXCEPTION_CODE);
+            }
+        }
+    }
+
+    private void validateAlias(String alias) throws FrontendException {
+        if (alias == null) {
+            throw new FrontendException("Column name for a field is not specified. Please provide the full schema as an argument to HCatStorer.", PigHCatUtil.PIG_EXCEPTION_CODE);
+        }
+        if (alias.matches(".*[A-Z]+.*")) {
+            throw new FrontendException("Column names should all be in lowercase. Invalid name found: " + alias, PigHCatUtil.PIG_EXCEPTION_CODE);
+        }
+    }
+
+    // Finds column by name in HCatSchema, if not found returns null.
+    private HCatFieldSchema getColFromSchema(String alias, HCatSchema tblSchema) {
+        if (tblSchema != null) {
+            for (HCatFieldSchema hcatField : tblSchema.getFields()) {
+                if (hcatField != null && hcatField.getName() != null && hcatField.getName().equalsIgnoreCase(alias)) {
+                    return hcatField;
+                }
+            }
+        }
+        // Its a new column
+        return null;
+    }
+
+    @Override
+    public void cleanupOnFailure(String location, Job job) throws IOException {
+        // No-op.
+    }
+
+    @Override
+    public void storeStatistics(ResourceStatistics stats, String arg1, Job job) throws IOException {
+    }
+}

Added: hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatEximLoader.java.broken
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatEximLoader.java.broken?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatEximLoader.java.broken (added)
+++ hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatEximLoader.java.broken Fri Sep  6 00:49:14 2013
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.pig;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.mapreduce.HCatBaseInputFormat;
+import org.apache.hcatalog.mapreduce.HCatEximInputFormat;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.pig.Expression;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.impl.util.UDFContext;
+
+/**
+ * Pig {@link LoadFunc} to read data/metadata from hcatalog exported location
+ */
+
+public class HCatEximLoader extends HCatBaseLoader {
+
+  private static final Log LOG = LogFactory.getLog(HCatEximLoader.class);
+
+  private HCatSchema tableSchema;
+  private HCatSchema partitionSchema;
+  private HCatEximInputFormat inputFormat;
+
+  public HCatEximLoader() {
+    LOG.debug("HCatEximLoader ctored");
+  }
+
+  @Override
+  public ResourceSchema getSchema(String location, Job job) throws IOException {
+    LOG.debug("getSchema with location :" + location);
+    if (tableSchema == null) {
+      List<HCatSchema> rv = HCatEximInputFormat.setInput(job, location, null);
+      tableSchema = rv.get(0);
+      partitionSchema = rv.get(1);
+    }
+    LOG.debug("getSchema got schema :" + tableSchema.toString());
+    List<HCatFieldSchema> colsPlusPartKeys = new ArrayList<HCatFieldSchema>();
+    colsPlusPartKeys.addAll(tableSchema.getFields());
+    colsPlusPartKeys.addAll(partitionSchema.getFields());
+    outputSchema = new HCatSchema(colsPlusPartKeys);
+    return PigHCatUtil.getResourceSchema(outputSchema);
+  }
+
+  @Override
+  public String[] getPartitionKeys(String location, Job job) throws IOException {
+    LOG.warn("getPartitionKeys with location :" + location);
+    /*
+    if (tableSchema == null) {
+      List<HCatSchema> rv = HCatEximInputFormat.setInput(job, location, null);
+      tableSchema = rv.get(0);
+      partitionSchema = rv.get(1);
+    }
+    return partitionSchema.getFieldNames().toArray(new String[0]);
+    */
+    return null;
+  }
+
+  @Override
+  public void setPartitionFilter(Expression partitionFilter) throws IOException {
+    LOG.debug("setPartitionFilter with filter :" + partitionFilter.toString());
+  }
+
+  @Override
+  public void setLocation(String location, Job job) throws IOException {
+    LOG.debug("setLocation with location :" + location);
+    List<HCatSchema> rv = HCatEximInputFormat.setInput(job, location, null);
+    tableSchema = rv.get(0);
+    partitionSchema = rv.get(1);
+    List<HCatFieldSchema> colsPlusPartKeys = new ArrayList<HCatFieldSchema>();
+    colsPlusPartKeys.addAll(tableSchema.getFields());
+    colsPlusPartKeys.addAll(partitionSchema.getFields());
+    outputSchema = new HCatSchema(colsPlusPartKeys);
+    UDFContext udfContext = UDFContext.getUDFContext();
+    Properties props = udfContext.getUDFProperties(this.getClass(),
+          new String[] {signature});
+    RequiredFieldList requiredFieldsInfo =
+          (RequiredFieldList) props.get(PRUNE_PROJECTION_INFO);
+    if (requiredFieldsInfo != null) {
+      ArrayList<HCatFieldSchema> fcols = new ArrayList<HCatFieldSchema>();
+      for (RequiredField rf : requiredFieldsInfo.getFields()) {
+        fcols.add(tableSchema.getFields().get(rf.getIndex()));
+      }
+      outputSchema = new HCatSchema(fcols);
+      try {
+        HCatBaseInputFormat.setOutputSchema(job, outputSchema);
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
+    }
+  }
+
+
+  @Override
+  public InputFormat getInputFormat() throws IOException {
+    if (inputFormat == null) {
+      inputFormat = new HCatEximInputFormat();
+    }
+    return inputFormat;
+  }
+
+}

Added: hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatEximStorer.java.broken
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatEximStorer.java.broken?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatEximStorer.java.broken (added)
+++ hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatEximStorer.java.broken Fri Sep  6 00:49:14 2013
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.pig;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.mapreduce.HCatEximOutputCommitter;
+import org.apache.hcatalog.mapreduce.HCatEximOutputFormat;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.UDFContext;
+
+/**
+ * HCatEximStorer.
+ *
+ */
+
+public class HCatEximStorer extends HCatBaseStorer {
+
+  private static final Log LOG = LogFactory.getLog(HCatEximStorer.class);
+
+  private final String outputLocation;
+
+  public HCatEximStorer(String outputLocation) throws Exception {
+    this(outputLocation, null, null);
+  }
+
+  public HCatEximStorer(String outputLocation, String partitionSpec) throws Exception {
+    this(outputLocation, partitionSpec, null);
+  }
+
+  public HCatEximStorer(String outputLocation, String partitionSpec, String schema)
+      throws Exception {
+    super(partitionSpec, schema);
+    this.outputLocation = outputLocation;
+    LOG.debug("HCatEximStorer called");
+  }
+
+  @Override
+  public OutputFormat getOutputFormat() throws IOException {
+    LOG.debug("getOutputFormat called");
+    return new HCatEximOutputFormat();
+  }
+
+  @Override
+  public void setStoreLocation(String location, Job job) throws IOException {
+    LOG.debug("setStoreLocation called with :" + location);
+    String[] userStr = location.split("\\.");
+    String dbname = MetaStoreUtils.DEFAULT_DATABASE_NAME;
+    String tablename = null;
+    if (userStr.length == 2) {
+      dbname = userStr[0];
+      tablename = userStr[1];
+    } else {
+      tablename = userStr[0];
+    }
+    Properties p = UDFContext.getUDFContext()
+        .getUDFProperties(this.getClass(), new String[] {sign});
+    Configuration config = job.getConfiguration();
+    if (!HCatUtil.checkJobContextIfRunningFromBackend(job)) {
+      Schema schema = (Schema) ObjectSerializer.deserialize(p.getProperty(PIG_SCHEMA));
+      if (schema != null) {
+        pigSchema = schema;
+      }
+      if (pigSchema == null) {
+        throw new FrontendException("Schema for data cannot be determined.",
+            PigHCatUtil.PIG_EXCEPTION_CODE);
+      }
+      HCatSchema hcatTblSchema = new HCatSchema(new ArrayList<HCatFieldSchema>());
+      try {
+        doSchemaValidations(pigSchema, hcatTblSchema);
+      } catch (HCatException he) {
+        throw new FrontendException(he.getMessage(), PigHCatUtil.PIG_EXCEPTION_CODE, he);
+      }
+
+      List<HCatFieldSchema> hcatFields = new ArrayList<HCatFieldSchema>();
+      List<String> partVals = new ArrayList<String>();
+      for (String key : partitionKeys) {
+        hcatFields.add(new HCatFieldSchema(key, HCatFieldSchema.Type.STRING, ""));
+        partVals.add(partitions.get(key));
+      }
+
+      HCatSchema outputSchema = convertPigSchemaToHCatSchema(pigSchema,
+          hcatTblSchema);
+      LOG.debug("Pig Schema '" + pigSchema.toString() + "' was converted to HCatSchema '"
+          + outputSchema);
+      HCatEximOutputFormat.setOutput(job,
+          dbname, tablename,
+          outputLocation,
+          new HCatSchema(hcatFields),
+          partVals,
+          outputSchema);
+      p.setProperty(COMPUTED_OUTPUT_SCHEMA, ObjectSerializer.serialize(outputSchema));
+      p.setProperty(HCatConstants.HCAT_KEY_OUTPUT_INFO,
+          config.get(HCatConstants.HCAT_KEY_OUTPUT_INFO));
+      if (config.get(HCatConstants.HCAT_KEY_HIVE_CONF) != null) {
+        p.setProperty(HCatConstants.HCAT_KEY_HIVE_CONF,
+            config.get(HCatConstants.HCAT_KEY_HIVE_CONF));
+      }
+    } else {
+      config.set(HCatConstants.HCAT_KEY_OUTPUT_INFO,
+          p.getProperty(HCatConstants.HCAT_KEY_OUTPUT_INFO));
+      if (p.getProperty(HCatConstants.HCAT_KEY_HIVE_CONF) != null) {
+        config.set(HCatConstants.HCAT_KEY_HIVE_CONF,
+            p.getProperty(HCatConstants.HCAT_KEY_HIVE_CONF));
+      }
+    }
+  }
+
+  @Override
+  public void storeSchema(ResourceSchema schema, String arg1, Job job) throws IOException {
+    if( job.getConfiguration().get("mapred.job.tracker", "").equalsIgnoreCase("local") ) {
+      //In local mode, mapreduce will not call OutputCommitter.cleanupJob.
+      //Calling it from here so that the partition publish happens.
+      //This call needs to be removed after MAPREDUCE-1447 is fixed.
+      new HCatEximOutputCommitter(job,null).cleanupJob(job);
+    }
+  }
+}

Added: hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatLoader.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatLoader.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatLoader.java (added)
+++ hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatLoader.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,273 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hcatalog.pig;
+
+import java.io.IOException;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatContext;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.Pair;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.hcatalog.mapreduce.InputJobInfo;
+import org.apache.pig.Expression;
+import org.apache.pig.Expression.BinaryExpression;
+import org.apache.pig.PigException;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceStatistics;
+import org.apache.pig.impl.util.UDFContext;
+
+/**
+ * Pig {@link org.apache.pig.LoadFunc} to read data from HCat
+ */
+
+public class HCatLoader extends HCatBaseLoader {
+
+    private static final String PARTITION_FILTER = "partition.filter"; // for future use
+
+    private HCatInputFormat hcatInputFormat = null;
+    private String dbName;
+    private String tableName;
+    private String hcatServerUri;
+    private String partitionFilterString;
+    private final PigHCatUtil phutil = new PigHCatUtil();
+
+    // Signature for wrapped loader, see comments in LoadFuncBasedInputDriver.initialize
+    final public static String INNER_SIGNATURE = "hcatloader.inner.signature";
+    final public static String INNER_SIGNATURE_PREFIX = "hcatloader_inner_signature";
+    // A hash map which stores job credentials. The key is a signature passed by Pig, which is
+    //unique to the load func and input file name (table, in our case).
+    private static Map<String, Credentials> jobCredentials = new HashMap<String, Credentials>();
+
+    @Override
+    public InputFormat<?, ?> getInputFormat() throws IOException {
+        if (hcatInputFormat == null) {
+            hcatInputFormat = new HCatInputFormat();
+        }
+        return hcatInputFormat;
+    }
+
+    @Override
+    public String relativeToAbsolutePath(String location, Path curDir) throws IOException {
+        return location;
+    }
+
+    @Override
+    public void setLocation(String location, Job job) throws IOException {
+        HCatContext.INSTANCE.setConf(job.getConfiguration()).getConf().get()
+            .setBoolean(HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION, true);
+
+        UDFContext udfContext = UDFContext.getUDFContext();
+        Properties udfProps = udfContext.getUDFProperties(this.getClass(),
+            new String[]{signature});
+        job.getConfiguration().set(INNER_SIGNATURE, INNER_SIGNATURE_PREFIX + "_" + signature);
+        Pair<String, String> dbTablePair = PigHCatUtil.getDBTableNames(location);
+        dbName = dbTablePair.first;
+        tableName = dbTablePair.second;
+
+        RequiredFieldList requiredFieldsInfo = (RequiredFieldList) udfProps
+            .get(PRUNE_PROJECTION_INFO);
+        // get partitionFilterString stored in the UDFContext - it would have
+        // been stored there by an earlier call to setPartitionFilter
+        // call setInput on HCatInputFormat only in the frontend because internally
+        // it makes calls to the hcat server - we don't want these to happen in
+        // the backend
+        // in the hadoop front end mapred.task.id property will not be set in
+        // the Configuration
+        if (udfProps.containsKey(HCatConstants.HCAT_PIG_LOADER_LOCATION_SET)) {
+            for (Enumeration<Object> emr = udfProps.keys(); emr.hasMoreElements(); ) {
+                PigHCatUtil.getConfigFromUDFProperties(udfProps,
+                    job.getConfiguration(), emr.nextElement().toString());
+            }
+            if (!HCatUtil.checkJobContextIfRunningFromBackend(job)) {
+                //Combine credentials and credentials from job takes precedence for freshness
+                Credentials crd = jobCredentials.get(INNER_SIGNATURE_PREFIX + "_" + signature);
+                crd.addAll(job.getCredentials());
+                job.getCredentials().addAll(crd);
+            }
+        } else {
+            Job clone = new Job(job.getConfiguration());
+            HCatInputFormat.setInput(job, dbName, tableName).setFilter(getPartitionFilterString());
+
+            // We will store all the new /changed properties in the job in the
+            // udf context, so the the HCatInputFormat.setInput method need not
+            //be called many times.
+            for (Entry<String, String> keyValue : job.getConfiguration()) {
+                String oldValue = clone.getConfiguration().getRaw(keyValue.getKey());
+                if ((oldValue == null) || (keyValue.getValue().equals(oldValue) == false)) {
+                    udfProps.put(keyValue.getKey(), keyValue.getValue());
+                }
+            }
+            udfProps.put(HCatConstants.HCAT_PIG_LOADER_LOCATION_SET, true);
+
+            //Store credentials in a private hash map and not the udf context to
+            // make sure they are not public.
+            Credentials crd = new Credentials();
+            crd.addAll(job.getCredentials());
+            jobCredentials.put(INNER_SIGNATURE_PREFIX + "_" + signature, crd);
+        }
+
+        // Need to also push projections by calling setOutputSchema on
+        // HCatInputFormat - we have to get the RequiredFields information
+        // from the UdfContext, translate it to an Schema and then pass it
+        // The reason we do this here is because setLocation() is called by
+        // Pig runtime at InputFormat.getSplits() and
+        // InputFormat.createRecordReader() time - we are not sure when
+        // HCatInputFormat needs to know about pruned projections - so doing it
+        // here will ensure we communicate to HCatInputFormat about pruned
+        // projections at getSplits() and createRecordReader() time
+
+        if (requiredFieldsInfo != null) {
+            // convert to hcatschema and pass to HCatInputFormat
+            try {
+                outputSchema = phutil.getHCatSchema(requiredFieldsInfo.getFields(), signature, this.getClass());
+                HCatInputFormat.setOutputSchema(job, outputSchema);
+            } catch (Exception e) {
+                throw new IOException(e);
+            }
+        } else {
+            // else - this means pig's optimizer never invoked the pushProjection
+            // method - so we need all fields and hence we should not call the
+            // setOutputSchema on HCatInputFormat
+            if (HCatUtil.checkJobContextIfRunningFromBackend(job)) {
+                try {
+                    HCatSchema hcatTableSchema = (HCatSchema) udfProps.get(HCatConstants.HCAT_TABLE_SCHEMA);
+                    outputSchema = hcatTableSchema;
+                    HCatInputFormat.setOutputSchema(job, outputSchema);
+                } catch (Exception e) {
+                    throw new IOException(e);
+                }
+            }
+        }
+
+    }
+
+    @Override
+    public String[] getPartitionKeys(String location, Job job)
+        throws IOException {
+        Table table = phutil.getTable(location,
+            hcatServerUri != null ? hcatServerUri : PigHCatUtil.getHCatServerUri(job),
+            PigHCatUtil.getHCatServerPrincipal(job));
+        List<FieldSchema> tablePartitionKeys = table.getPartitionKeys();
+        String[] partitionKeys = new String[tablePartitionKeys.size()];
+        for (int i = 0; i < tablePartitionKeys.size(); i++) {
+            partitionKeys[i] = tablePartitionKeys.get(i).getName();
+        }
+        return partitionKeys;
+    }
+
+    @Override
+    public ResourceSchema getSchema(String location, Job job) throws IOException {
+        HCatContext.INSTANCE.setConf(job.getConfiguration()).getConf().get()
+            .setBoolean(HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION, true);
+
+        Table table = phutil.getTable(location,
+            hcatServerUri != null ? hcatServerUri : PigHCatUtil.getHCatServerUri(job),
+            PigHCatUtil.getHCatServerPrincipal(job));
+        HCatSchema hcatTableSchema = HCatUtil.getTableSchemaWithPtnCols(table);
+        try {
+            PigHCatUtil.validateHCatTableSchemaFollowsPigRules(hcatTableSchema);
+        } catch (IOException e) {
+            throw new PigException(
+                "Table schema incompatible for reading through HCatLoader :" + e.getMessage()
+                    + ";[Table schema was " + hcatTableSchema.toString() + "]"
+                , PigHCatUtil.PIG_EXCEPTION_CODE, e);
+        }
+        storeInUDFContext(signature, HCatConstants.HCAT_TABLE_SCHEMA, hcatTableSchema);
+        outputSchema = hcatTableSchema;
+        return PigHCatUtil.getResourceSchema(hcatTableSchema);
+    }
+
+    @Override
+    public void setPartitionFilter(Expression partitionFilter) throws IOException {
+        // convert the partition filter expression into a string expected by
+        // hcat and pass it in setLocation()
+
+        partitionFilterString = getHCatComparisonString(partitionFilter);
+
+        // store this in the udf context so we can get it later
+        storeInUDFContext(signature,
+            PARTITION_FILTER, partitionFilterString);
+    }
+
+    /**
+     * Get statistics about the data to be loaded. Only input data size is implemented at this time.
+     */
+    @Override
+    public ResourceStatistics getStatistics(String location, Job job) throws IOException {
+        try {
+            ResourceStatistics stats = new ResourceStatistics();
+            InputJobInfo inputJobInfo = (InputJobInfo) HCatUtil.deserialize(
+                job.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO));
+            stats.setmBytes(getSizeInBytes(inputJobInfo) / 1024 / 1024);
+            return stats;
+        } catch (Exception e) {
+            throw new IOException(e);
+        }
+    }
+
+    private String getPartitionFilterString() {
+        if (partitionFilterString == null) {
+            Properties props = UDFContext.getUDFContext().getUDFProperties(
+                this.getClass(), new String[]{signature});
+            partitionFilterString = props.getProperty(PARTITION_FILTER);
+        }
+        return partitionFilterString;
+    }
+
+    private String getHCatComparisonString(Expression expr) {
+        if (expr instanceof BinaryExpression) {
+            // call getHCatComparisonString on lhs and rhs, and and join the
+            // results with OpType string
+
+            // we can just use OpType.toString() on all Expression types except
+            // Equal, NotEqualt since Equal has '==' in toString() and
+            // we need '='
+            String opStr = null;
+            switch (expr.getOpType()) {
+            case OP_EQ:
+                opStr = " = ";
+                break;
+            default:
+                opStr = expr.getOpType().toString();
+            }
+            BinaryExpression be = (BinaryExpression) expr;
+            return "(" + getHCatComparisonString(be.getLhs()) +
+                opStr +
+                getHCatComparisonString(be.getRhs()) + ")";
+        } else {
+            // should be a constant or column
+            return expr.toString();
+        }
+    }
+
+}

Added: hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatStorer.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatStorer.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatStorer.java (added)
+++ hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatStorer.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.pig;
+
+import java.io.IOException;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatContext;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.mapreduce.HCatOutputFormat;
+import org.apache.hcatalog.mapreduce.OutputJobInfo;
+import org.apache.hcatalog.shims.HCatHadoopShims;
+import org.apache.pig.PigException;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.UDFContext;
+
+/**
+ * HCatStorer.
+ *
+ */
+
+public class HCatStorer extends HCatBaseStorer {
+
+    // Signature for wrapped storer, see comments in LoadFuncBasedInputDriver.initialize
+    final public static String INNER_SIGNATURE = "hcatstorer.inner.signature";
+    final public static String INNER_SIGNATURE_PREFIX = "hcatstorer_inner_signature";
+    // A hash map which stores job credentials. The key is a signature passed by Pig, which is
+    //unique to the store func and out file name (table, in our case).
+    private static Map<String, Credentials> jobCredentials = new HashMap<String, Credentials>();
+
+
+    public HCatStorer(String partSpecs, String schema) throws Exception {
+        super(partSpecs, schema);
+    }
+
+    public HCatStorer(String partSpecs) throws Exception {
+        this(partSpecs, null);
+    }
+
+    public HCatStorer() throws Exception {
+        this(null, null);
+    }
+
+    @Override
+    public OutputFormat getOutputFormat() throws IOException {
+        return new HCatOutputFormat();
+    }
+
+    @Override
+    public void setStoreLocation(String location, Job job) throws IOException {
+        HCatContext.INSTANCE.setConf(job.getConfiguration()).getConf().get()
+            .setBoolean(HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION, false);
+
+        Configuration config = job.getConfiguration();
+        config.set(INNER_SIGNATURE, INNER_SIGNATURE_PREFIX + "_" + sign);
+        Properties udfProps = UDFContext.getUDFContext().getUDFProperties(
+            this.getClass(), new String[]{sign});
+        String[] userStr = location.split("\\.");
+
+        if (udfProps.containsKey(HCatConstants.HCAT_PIG_STORER_LOCATION_SET)) {
+            for (Enumeration<Object> emr = udfProps.keys(); emr.hasMoreElements(); ) {
+                PigHCatUtil.getConfigFromUDFProperties(udfProps, config, emr.nextElement().toString());
+            }
+            Credentials crd = jobCredentials.get(INNER_SIGNATURE_PREFIX + "_" + sign);
+            if (crd != null) {
+                job.getCredentials().addAll(crd);
+            }
+        } else {
+            Job clone = new Job(job.getConfiguration());
+            OutputJobInfo outputJobInfo;
+            if (userStr.length == 2) {
+                outputJobInfo = OutputJobInfo.create(userStr[0], userStr[1], partitions);
+            } else if (userStr.length == 1) {
+                outputJobInfo = OutputJobInfo.create(null, userStr[0], partitions);
+            } else {
+                throw new FrontendException("location " + location
+                    + " is invalid. It must be of the form [db.]table",
+                    PigHCatUtil.PIG_EXCEPTION_CODE);
+            }
+            Schema schema = (Schema) ObjectSerializer.deserialize(udfProps.getProperty(PIG_SCHEMA));
+            if (schema != null) {
+                pigSchema = schema;
+            }
+            if (pigSchema == null) {
+                throw new FrontendException(
+                    "Schema for data cannot be determined.",
+                    PigHCatUtil.PIG_EXCEPTION_CODE);
+            }
+            String externalLocation = (String) udfProps.getProperty(HCatConstants.HCAT_PIG_STORER_EXTERNAL_LOCATION);
+            if (externalLocation != null) {
+                outputJobInfo.setLocation(externalLocation);
+            }
+            try {
+                HCatOutputFormat.setOutput(job, outputJobInfo);
+            } catch (HCatException he) {
+                // pass the message to the user - essentially something about
+                // the table
+                // information passed to HCatOutputFormat was not right
+                throw new PigException(he.getMessage(),
+                    PigHCatUtil.PIG_EXCEPTION_CODE, he);
+            }
+            HCatSchema hcatTblSchema = HCatOutputFormat.getTableSchema(job);
+            try {
+                doSchemaValidations(pigSchema, hcatTblSchema);
+            } catch (HCatException he) {
+                throw new FrontendException(he.getMessage(), PigHCatUtil.PIG_EXCEPTION_CODE, he);
+            }
+            computedSchema = convertPigSchemaToHCatSchema(pigSchema, hcatTblSchema);
+            HCatOutputFormat.setSchema(job, computedSchema);
+            udfProps.setProperty(COMPUTED_OUTPUT_SCHEMA, ObjectSerializer.serialize(computedSchema));
+
+            // We will store all the new /changed properties in the job in the
+            // udf context, so the the HCatOutputFormat.setOutput and setSchema
+            // methods need not be called many times.
+            for (Entry<String, String> keyValue : job.getConfiguration()) {
+                String oldValue = clone.getConfiguration().getRaw(keyValue.getKey());
+                if ((oldValue == null) || (keyValue.getValue().equals(oldValue) == false)) {
+                    udfProps.put(keyValue.getKey(), keyValue.getValue());
+                }
+            }
+            //Store credentials in a private hash map and not the udf context to
+            // make sure they are not public.
+            jobCredentials.put(INNER_SIGNATURE_PREFIX + "_" + sign, job.getCredentials());
+            udfProps.put(HCatConstants.HCAT_PIG_STORER_LOCATION_SET, true);
+        }
+    }
+
+    @Override
+    public void storeSchema(ResourceSchema schema, String arg1, Job job) throws IOException {
+        HCatHadoopShims.Instance.get().commitJob(getOutputFormat(), job);
+    }
+
+    @Override
+    public void cleanupOnFailure(String location, Job job) throws IOException {
+        HCatHadoopShims.Instance.get().abortJob(getOutputFormat(), job);
+    }
+}

Added: hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/PigHCatUtil.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/PigHCatUtil.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/PigHCatUtil.java (added)
+++ hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/PigHCatUtil.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,488 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hcatalog.pig;
+
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.Pair;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatFieldSchema.Type;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.pig.LoadPushDown.RequiredField;
+import org.apache.pig.PigException;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.DefaultDataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.impl.util.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class PigHCatUtil {
+
+    private static final Logger LOG = LoggerFactory.getLogger(PigHCatUtil.class);
+
+    static final int PIG_EXCEPTION_CODE = 1115; // http://wiki.apache.org/pig/PigErrorHandlingFunctionalSpecification#Error_codes
+    private static final String DEFAULT_DB = MetaStoreUtils.DEFAULT_DATABASE_NAME;
+
+    private final Map<Pair<String, String>, Table> hcatTableCache =
+        new HashMap<Pair<String, String>, Table>();
+
+    private static final TupleFactory tupFac = TupleFactory.getInstance();
+
+    private static boolean pigHasBooleanSupport = false;
+
+    /**
+     * Determine if the current Pig version supports boolean columns. This works around a
+     * dependency conflict preventing HCatalog from requiring a version of Pig with boolean
+     * field support and should be removed once HCATALOG-466 has been resolved.
+     */
+    static {
+        // DETAILS:
+        //
+        // PIG-1429 added support for boolean fields, which shipped in 0.10.0;
+        // this version of Pig depends on antlr 3.4.
+        //
+        // HCatalog depends heavily on Hive, which at this time uses antlr 3.0.1.
+        //
+        // antlr 3.0.1 and 3.4 are incompatible, so Pig 0.10.0 and Hive cannot be depended on in the
+        // same project. Pig 0.8.0 did not use antlr for its parser and can coexist with Hive,
+        // so that Pig version is depended on by HCatalog at this time.
+        try {
+            Schema schema = Utils.getSchemaFromString("myBooleanField: boolean");
+            pigHasBooleanSupport = (schema.getField("myBooleanField").type == DataType.BOOLEAN);
+        } catch (Throwable e) {
+            // pass
+        }
+
+        if (!pigHasBooleanSupport) {
+            LOG.info("This version of Pig does not support boolean fields. To enable "
+                    + "boolean-to-integer conversion, set the "
+                    + HCatConstants.HCAT_DATA_CONVERT_BOOLEAN_TO_INTEGER
+                    + "=true configuration parameter.");
+        }
+    }
+
+    static public Pair<String, String> getDBTableNames(String location) throws IOException {
+        // the location string will be of the form:
+        // <database name>.<table name> - parse it and
+        // communicate the information to HCatInputFormat
+
+        try {
+            return HCatUtil.getDbAndTableName(location);
+        } catch (IOException e) {
+            String locationErrMsg = "The input location in load statement " +
+                "should be of the form " +
+                "<databasename>.<table name> or <table name>. Got " + location;
+            throw new PigException(locationErrMsg, PIG_EXCEPTION_CODE);
+        }
+    }
+
+    static public String getHCatServerUri(Job job) {
+
+        return job.getConfiguration().get(HiveConf.ConfVars.METASTOREURIS.varname);
+    }
+
+    static public String getHCatServerPrincipal(Job job) {
+
+        return job.getConfiguration().get(HCatConstants.HCAT_METASTORE_PRINCIPAL);
+    }
+
+    private static HiveMetaStoreClient getHiveMetaClient(String serverUri,
+                                                         String serverKerberosPrincipal, Class<?> clazz) throws Exception {
+        HiveConf hiveConf = new HiveConf(clazz);
+
+        if (serverUri != null) {
+            hiveConf.set("hive.metastore.local", "false");
+            hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, serverUri.trim());
+        }
+
+        if (serverKerberosPrincipal != null) {
+            hiveConf.setBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL, true);
+            hiveConf.setVar(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL, serverKerberosPrincipal);
+        }
+
+        try {
+            return HCatUtil.getHiveClient(hiveConf);
+        } catch (Exception e) {
+            throw new Exception("Could not instantiate a HiveMetaStoreClient connecting to server uri:[" + serverUri + "]", e);
+        }
+    }
+
+
+    HCatSchema getHCatSchema(List<RequiredField> fields, String signature, Class<?> classForUDFCLookup) throws IOException {
+        if (fields == null) {
+            return null;
+        }
+
+        Properties props = UDFContext.getUDFContext().getUDFProperties(
+            classForUDFCLookup, new String[]{signature});
+        HCatSchema hcatTableSchema = (HCatSchema) props.get(HCatConstants.HCAT_TABLE_SCHEMA);
+
+        ArrayList<HCatFieldSchema> fcols = new ArrayList<HCatFieldSchema>();
+        for (RequiredField rf : fields) {
+            fcols.add(hcatTableSchema.getFields().get(rf.getIndex()));
+        }
+        return new HCatSchema(fcols);
+    }
+
+    public Table getTable(String location, String hcatServerUri, String hcatServerPrincipal) throws IOException {
+        Pair<String, String> loc_server = new Pair<String, String>(location, hcatServerUri);
+        Table hcatTable = hcatTableCache.get(loc_server);
+        if (hcatTable != null) {
+            return hcatTable;
+        }
+
+        Pair<String, String> dbTablePair = PigHCatUtil.getDBTableNames(location);
+        String dbName = dbTablePair.first;
+        String tableName = dbTablePair.second;
+        Table table = null;
+        HiveMetaStoreClient client = null;
+        try {
+            client = getHiveMetaClient(hcatServerUri, hcatServerPrincipal, PigHCatUtil.class);
+            table = HCatUtil.getTable(client, dbName, tableName);
+        } catch (NoSuchObjectException nsoe) {
+            throw new PigException("Table not found : " + nsoe.getMessage(), PIG_EXCEPTION_CODE); // prettier error messages to frontend
+        } catch (Exception e) {
+            throw new IOException(e);
+        } finally {
+            HCatUtil.closeHiveClientQuietly(client);
+        }
+        hcatTableCache.put(loc_server, table);
+        return table;
+    }
+
+    public static ResourceSchema getResourceSchema(HCatSchema hcatSchema) throws IOException {
+
+        List<ResourceFieldSchema> rfSchemaList = new ArrayList<ResourceFieldSchema>();
+        for (HCatFieldSchema hfs : hcatSchema.getFields()) {
+            ResourceFieldSchema rfSchema;
+            rfSchema = getResourceSchemaFromFieldSchema(hfs);
+            rfSchemaList.add(rfSchema);
+        }
+        ResourceSchema rSchema = new ResourceSchema();
+        rSchema.setFields(rfSchemaList.toArray(new ResourceFieldSchema[0]));
+        return rSchema;
+
+    }
+
+    private static ResourceFieldSchema getResourceSchemaFromFieldSchema(HCatFieldSchema hfs)
+        throws IOException {
+        ResourceFieldSchema rfSchema;
+        // if we are dealing with a bag or tuple column - need to worry about subschema
+        if (hfs.getType() == Type.STRUCT) {
+            rfSchema = new ResourceFieldSchema()
+                .setName(hfs.getName())
+                .setDescription(hfs.getComment())
+                .setType(getPigType(hfs))
+                .setSchema(getTupleSubSchema(hfs));
+        } else if (hfs.getType() == Type.ARRAY) {
+            rfSchema = new ResourceFieldSchema()
+                .setName(hfs.getName())
+                .setDescription(hfs.getComment())
+                .setType(getPigType(hfs))
+                .setSchema(getBagSubSchema(hfs));
+        } else {
+            rfSchema = new ResourceFieldSchema()
+                .setName(hfs.getName())
+                .setDescription(hfs.getComment())
+                .setType(getPigType(hfs))
+                .setSchema(null); // no munging inner-schemas
+        }
+        return rfSchema;
+    }
+
+    protected static ResourceSchema getBagSubSchema(HCatFieldSchema hfs) throws IOException {
+        // there are two cases - array<Type> and array<struct<...>>
+        // in either case the element type of the array is represented in a
+        // tuple field schema in the bag's field schema - the second case (struct)
+        // more naturally translates to the tuple - in the first case (array<Type>)
+        // we simulate the tuple by putting the single field in a tuple
+
+        Properties props = UDFContext.getUDFContext().getClientSystemProps();
+        String innerTupleName = HCatConstants.HCAT_PIG_INNER_TUPLE_NAME_DEFAULT;
+        if (props != null && props.containsKey(HCatConstants.HCAT_PIG_INNER_TUPLE_NAME)) {
+            innerTupleName = props.getProperty(HCatConstants.HCAT_PIG_INNER_TUPLE_NAME)
+                .replaceAll("FIELDNAME", hfs.getName());
+        }
+        String innerFieldName = HCatConstants.HCAT_PIG_INNER_FIELD_NAME_DEFAULT;
+        if (props != null && props.containsKey(HCatConstants.HCAT_PIG_INNER_FIELD_NAME)) {
+            innerFieldName = props.getProperty(HCatConstants.HCAT_PIG_INNER_FIELD_NAME)
+                .replaceAll("FIELDNAME", hfs.getName());
+        }
+
+        ResourceFieldSchema[] bagSubFieldSchemas = new ResourceFieldSchema[1];
+        bagSubFieldSchemas[0] = new ResourceFieldSchema().setName(innerTupleName)
+            .setDescription("The tuple in the bag")
+            .setType(DataType.TUPLE);
+        HCatFieldSchema arrayElementFieldSchema = hfs.getArrayElementSchema().get(0);
+        if (arrayElementFieldSchema.getType() == Type.STRUCT) {
+            bagSubFieldSchemas[0].setSchema(getTupleSubSchema(arrayElementFieldSchema));
+        } else if (arrayElementFieldSchema.getType() == Type.ARRAY) {
+            ResourceSchema s = new ResourceSchema();
+            List<ResourceFieldSchema> lrfs = Arrays.asList(getResourceSchemaFromFieldSchema(arrayElementFieldSchema));
+            s.setFields(lrfs.toArray(new ResourceFieldSchema[0]));
+            bagSubFieldSchemas[0].setSchema(s);
+        } else {
+            ResourceFieldSchema[] innerTupleFieldSchemas = new ResourceFieldSchema[1];
+            innerTupleFieldSchemas[0] = new ResourceFieldSchema().setName(innerFieldName)
+                .setDescription("The inner field in the tuple in the bag")
+                .setType(getPigType(arrayElementFieldSchema))
+                .setSchema(null); // the element type is not a tuple - so no subschema
+            bagSubFieldSchemas[0].setSchema(new ResourceSchema().setFields(innerTupleFieldSchemas));
+        }
+        ResourceSchema s = new ResourceSchema().setFields(bagSubFieldSchemas);
+        return s;
+
+    }
+
+    private static ResourceSchema getTupleSubSchema(HCatFieldSchema hfs) throws IOException {
+        // for each struct subfield, create equivalent ResourceFieldSchema
+        ResourceSchema s = new ResourceSchema();
+        List<ResourceFieldSchema> lrfs = new ArrayList<ResourceFieldSchema>();
+        for (HCatFieldSchema subField : hfs.getStructSubSchema().getFields()) {
+            lrfs.add(getResourceSchemaFromFieldSchema(subField));
+        }
+        s.setFields(lrfs.toArray(new ResourceFieldSchema[0]));
+        return s;
+    }
+
+    /**
+     * @param hfs the field schema of the column
+     * @return corresponding pig type
+     * @throws IOException
+     */
+    static public byte getPigType(HCatFieldSchema hfs) throws IOException {
+        return getPigType(hfs.getType());
+    }
+
+    static public byte getPigType(Type type) throws IOException {
+        if (type == Type.STRING) {
+            return DataType.CHARARRAY;
+        }
+
+        if ((type == Type.INT) || (type == Type.SMALLINT) || (type == Type.TINYINT)) {
+            return DataType.INTEGER;
+        }
+
+        if (type == Type.ARRAY) {
+            return DataType.BAG;
+        }
+
+        if (type == Type.STRUCT) {
+            return DataType.TUPLE;
+        }
+
+        if (type == Type.MAP) {
+            return DataType.MAP;
+        }
+
+        if (type == Type.BIGINT) {
+            return DataType.LONG;
+        }
+
+        if (type == Type.FLOAT) {
+            return DataType.FLOAT;
+        }
+
+        if (type == Type.DOUBLE) {
+            return DataType.DOUBLE;
+        }
+
+        if (type == Type.BINARY) {
+            return DataType.BYTEARRAY;
+        }
+
+        if (type == Type.BOOLEAN && pigHasBooleanSupport) {
+            return DataType.BOOLEAN;
+        }
+
+        throw new PigException("HCatalog column type '" + type.toString()
+                + "' is not supported in Pig as a column type", PIG_EXCEPTION_CODE);
+    }
+
+    public static Tuple transformToTuple(HCatRecord hr, HCatSchema hs) throws Exception {
+        if (hr == null) {
+            return null;
+        }
+        return transformToTuple(hr.getAll(), hs);
+    }
+
+    @SuppressWarnings("unchecked")
+    public static Object extractPigObject(Object o, HCatFieldSchema hfs) throws Exception {
+        Object result;
+        Type itemType = hfs.getType();
+        switch (itemType) {
+        case BINARY:
+            result = (o == null) ? null : new DataByteArray((byte[]) o);
+            break;
+        case STRUCT:
+            result = transformToTuple((List<Object>) o, hfs);
+            break;
+        case ARRAY:
+            result = transformToBag((List<? extends Object>) o, hfs);
+            break;
+        case MAP:
+            result = transformToPigMap((Map<Object, Object>) o, hfs);
+            break;
+        default:
+            result = o;
+            break;
+        }
+        return result;
+    }
+
+    private static Tuple transformToTuple(List<? extends Object> objList, HCatFieldSchema hfs) throws Exception {
+        try {
+            return transformToTuple(objList, hfs.getStructSubSchema());
+        } catch (Exception e) {
+            if (hfs.getType() != Type.STRUCT) {
+                throw new Exception("Expected Struct type, got " + hfs.getType(), e);
+            } else {
+                throw e;
+            }
+        }
+    }
+
+    private static Tuple transformToTuple(List<? extends Object> objList, HCatSchema hs) throws Exception {
+        if (objList == null) {
+            return null;
+        }
+        Tuple t = tupFac.newTuple(objList.size());
+        List<HCatFieldSchema> subFields = hs.getFields();
+        for (int i = 0; i < subFields.size(); i++) {
+            t.set(i, extractPigObject(objList.get(i), subFields.get(i)));
+        }
+        return t;
+    }
+
+    private static Map<String, Object> transformToPigMap(Map<Object, Object> map, HCatFieldSchema hfs) throws Exception {
+        if (map == null) {
+            return null;
+        }
+
+        Map<String, Object> result = new HashMap<String, Object>();
+        for (Entry<Object, Object> entry : map.entrySet()) {
+            // since map key for Pig has to be Strings
+            result.put(entry.getKey().toString(), extractPigObject(entry.getValue(), hfs.getMapValueSchema().get(0)));
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    private static DataBag transformToBag(List<? extends Object> list, HCatFieldSchema hfs) throws Exception {
+        if (list == null) {
+            return null;
+        }
+
+        HCatFieldSchema elementSubFieldSchema = hfs.getArrayElementSchema().getFields().get(0);
+        DataBag db = new DefaultDataBag();
+        for (Object o : list) {
+            Tuple tuple;
+            if (elementSubFieldSchema.getType() == Type.STRUCT) {
+                tuple = transformToTuple((List<Object>) o, elementSubFieldSchema);
+            } else {
+                // bags always contain tuples
+                tuple = tupFac.newTuple(extractPigObject(o, elementSubFieldSchema));
+            }
+            db.add(tuple);
+        }
+        return db;
+    }
+
+
+    private static void validateHCatSchemaFollowsPigRules(HCatSchema tblSchema) throws PigException {
+        for (HCatFieldSchema hcatField : tblSchema.getFields()) {
+            validateHcatFieldFollowsPigRules(hcatField);
+        }
+    }
+
+    private static void validateHcatFieldFollowsPigRules(HCatFieldSchema hcatField) throws PigException {
+        try {
+            Type hType = hcatField.getType();
+            switch (hType) {
+            case BOOLEAN:
+                if (!pigHasBooleanSupport) {
+                    throw new PigException("Incompatible type found in HCat table schema: "
+                            + hcatField, PigHCatUtil.PIG_EXCEPTION_CODE);
+                }
+                break;
+            case ARRAY:
+                validateHCatSchemaFollowsPigRules(hcatField.getArrayElementSchema());
+                break;
+            case STRUCT:
+                validateHCatSchemaFollowsPigRules(hcatField.getStructSubSchema());
+                break;
+            case MAP:
+                // key is only string
+                if (hcatField.getMapKeyType() != Type.STRING) {
+                    LOG.info("Converting non-String key of map " + hcatField.getName() + " from "
+                        + hcatField.getMapKeyType() + " to String.");
+                }
+                validateHCatSchemaFollowsPigRules(hcatField.getMapValueSchema());
+                break;
+            }
+        } catch (HCatException e) {
+            throw new PigException("Incompatible type found in hcat table schema: " + hcatField, PigHCatUtil.PIG_EXCEPTION_CODE, e);
+        }
+    }
+
+
+    public static void validateHCatTableSchemaFollowsPigRules(HCatSchema hcatTableSchema) throws IOException {
+        validateHCatSchemaFollowsPigRules(hcatTableSchema);
+    }
+
+    public static void getConfigFromUDFProperties(Properties p, Configuration config, String propName) {
+        if (p.getProperty(propName) != null) {
+            config.set(propName, p.getProperty(propName));
+        }
+    }
+
+    public static void saveConfigIntoUDFProperties(Properties p, Configuration config, String propName) {
+        if (config.get(propName) != null) {
+            p.setProperty(propName, config.get(propName));
+        }
+    }
+
+}

Added: hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/HCatStorerWrapper.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/HCatStorerWrapper.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/HCatStorerWrapper.java (added)
+++ hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/HCatStorerWrapper.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.pig;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.pig.impl.util.UDFContext;
+
+/**
+ * This class is used to test the HCAT_PIG_STORER_EXTERNAL_LOCATION property used in HCatStorer.
+ * When this property is set, HCatStorer writes the output to the location it specifies. Since
+ * the property can only be set in the UDFContext, we need this simpler wrapper to do three things:
+ * <ol>
+ * <li> save the external dir specified in the Pig script </li>
+ * <li> set the same UDFContext signature as HCatStorer </li>
+ * <li> before {@link HCatStorer#setStoreLocation(String, Job)}, set the external dir in the UDFContext.</li>
+ * </ol>
+ */
+public class HCatStorerWrapper extends HCatStorer {
+
+    private String sign;
+    private String externalDir;
+
+    public HCatStorerWrapper(String partSpecs, String schema, String externalDir) throws Exception {
+	super(partSpecs, schema);
+	this.externalDir = externalDir;
+    }
+
+    public HCatStorerWrapper(String partSpecs, String externalDir) throws Exception {
+	super(partSpecs);
+	this.externalDir = externalDir;
+    }
+
+    public HCatStorerWrapper(String externalDir) throws Exception{
+	super();
+	this.externalDir = externalDir;
+    }
+
+    @Override
+    public void setStoreLocation(String location, Job job) throws IOException {
+	Properties udfProps = UDFContext.getUDFContext().getUDFProperties(
+		this.getClass(), new String[] { sign });
+	udfProps.setProperty(HCatConstants.HCAT_PIG_STORER_EXTERNAL_LOCATION, externalDir);
+	super.setStoreLocation(location, job);
+    }
+
+    @Override
+    public void setStoreFuncUDFContextSignature(String signature) {
+	sign = signature;
+	super.setStoreFuncUDFContextSignature(signature);
+    }
+}