You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by tr...@apache.org on 2012/09/10 23:29:03 UTC

svn commit: r1383152 [2/27] - in /incubator/hcatalog/trunk: ./ hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/ hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/ hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/ s...

Modified: incubator/hcatalog/trunk/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatBaseStorer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatBaseStorer.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatBaseStorer.java (original)
+++ incubator/hcatalog/trunk/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatBaseStorer.java Mon Sep 10 23:28:55 2012
@@ -60,382 +60,382 @@ import org.apache.pig.impl.util.Utils;
 
 public abstract class HCatBaseStorer extends StoreFunc implements StoreMetadata {
 
-  private static final List<Type> SUPPORTED_INTEGER_CONVERSIONS =
-      Lists.newArrayList(Type.TINYINT, Type.SMALLINT, Type.INT);
-  protected static final String COMPUTED_OUTPUT_SCHEMA = "hcat.output.schema";
-  protected final List<String> partitionKeys;
-  protected final Map<String,String> partitions;
-  protected Schema pigSchema;
-  private RecordWriter<WritableComparable<?>, HCatRecord> writer;
-  protected HCatSchema computedSchema;
-  protected static final String PIG_SCHEMA = "hcat.pig.store.schema";
-  protected String sign;
-
-  public HCatBaseStorer(String partSpecs, String schema) throws Exception {
-
-    partitionKeys = new ArrayList<String>();
-    partitions = new HashMap<String, String>();
-    if(partSpecs != null && !partSpecs.trim().isEmpty()){
-      String[] partKVPs = partSpecs.split(",");
-      for(String partKVP : partKVPs){
-        String[] partKV = partKVP.split("=");
-        if(partKV.length == 2) {
-          String partKey = partKV[0].trim();
-          partitionKeys.add(partKey);
-          partitions.put(partKey, partKV[1].trim());
-        } else {
-          throw new FrontendException("Invalid partition column specification. "+partSpecs, PigHCatUtil.PIG_EXCEPTION_CODE);
+    private static final List<Type> SUPPORTED_INTEGER_CONVERSIONS =
+        Lists.newArrayList(Type.TINYINT, Type.SMALLINT, Type.INT);
+    protected static final String COMPUTED_OUTPUT_SCHEMA = "hcat.output.schema";
+    protected final List<String> partitionKeys;
+    protected final Map<String, String> partitions;
+    protected Schema pigSchema;
+    private RecordWriter<WritableComparable<?>, HCatRecord> writer;
+    protected HCatSchema computedSchema;
+    protected static final String PIG_SCHEMA = "hcat.pig.store.schema";
+    protected String sign;
+
+    public HCatBaseStorer(String partSpecs, String schema) throws Exception {
+
+        partitionKeys = new ArrayList<String>();
+        partitions = new HashMap<String, String>();
+        if (partSpecs != null && !partSpecs.trim().isEmpty()) {
+            String[] partKVPs = partSpecs.split(",");
+            for (String partKVP : partKVPs) {
+                String[] partKV = partKVP.split("=");
+                if (partKV.length == 2) {
+                    String partKey = partKV[0].trim();
+                    partitionKeys.add(partKey);
+                    partitions.put(partKey, partKV[1].trim());
+                } else {
+                    throw new FrontendException("Invalid partition column specification. " + partSpecs, PigHCatUtil.PIG_EXCEPTION_CODE);
+                }
+            }
+        }
+
+        if (schema != null) {
+            pigSchema = Utils.getSchemaFromString(schema);
         }
-      }
-    }
 
-    if(schema != null) {
-      pigSchema = Utils.getSchemaFromString(schema);
     }
 
-  }
-
-  @Override
-  public void checkSchema(ResourceSchema resourceSchema) throws IOException {
+    @Override
+    public void checkSchema(ResourceSchema resourceSchema) throws IOException {
+
+        /*  Schema provided by user and the schema computed by Pig
+        * at the time of calling store must match.
+        */
+        Schema runtimeSchema = Schema.getPigSchema(resourceSchema);
+        if (pigSchema != null) {
+            if (!Schema.equals(runtimeSchema, pigSchema, false, true)) {
+                throw new FrontendException("Schema provided in store statement doesn't match with the Schema" +
+                    "returned by Pig run-time. Schema provided in HCatStorer: " + pigSchema.toString() + " Schema received from Pig runtime: " + runtimeSchema.toString(), PigHCatUtil.PIG_EXCEPTION_CODE);
+            }
+        } else {
+            pigSchema = runtimeSchema;
+        }
+        UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[]{sign}).setProperty(PIG_SCHEMA, ObjectSerializer.serialize(pigSchema));
+    }
 
-    /*  Schema provided by user and the schema computed by Pig
-     * at the time of calling store must match.
+    /** Constructs HCatSchema from pigSchema. Passed tableSchema is the existing
+     * schema of the table in metastore.
      */
-    Schema runtimeSchema = Schema.getPigSchema(resourceSchema);
-    if(pigSchema != null){
-      if(! Schema.equals(runtimeSchema, pigSchema, false, true) ){
-        throw new FrontendException("Schema provided in store statement doesn't match with the Schema" +
-            "returned by Pig run-time. Schema provided in HCatStorer: "+pigSchema.toString()+ " Schema received from Pig runtime: "+runtimeSchema.toString(), PigHCatUtil.PIG_EXCEPTION_CODE);
-      }
-    } else {
-      pigSchema = runtimeSchema;
-    }
-    UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[]{sign}).setProperty(PIG_SCHEMA,ObjectSerializer.serialize(pigSchema));
-  }
-
-  /** Constructs HCatSchema from pigSchema. Passed tableSchema is the existing
-   * schema of the table in metastore.
-   */
-  protected HCatSchema convertPigSchemaToHCatSchema(Schema pigSchema, HCatSchema tableSchema) throws FrontendException{
-    List<HCatFieldSchema> fieldSchemas = new ArrayList<HCatFieldSchema>(pigSchema.size());
-    for(FieldSchema fSchema : pigSchema.getFields()){
-      try {
-        HCatFieldSchema hcatFieldSchema = getColFromSchema(fSchema.alias, tableSchema);
-
-        fieldSchemas.add(getHCatFSFromPigFS(fSchema, hcatFieldSchema));
-      } catch (HCatException he){
-          throw new FrontendException(he.getMessage(),PigHCatUtil.PIG_EXCEPTION_CODE,he);
-      }
-    }
-    return new HCatSchema(fieldSchemas);
-  }
-
-  public static boolean removeTupleFromBag(HCatFieldSchema hcatFieldSchema, FieldSchema bagFieldSchema) throws HCatException{
-    if (hcatFieldSchema != null && hcatFieldSchema.getArrayElementSchema().get(0).getType() != Type.STRUCT) {
-      return true;
-    }
-    // Column was not found in table schema. Its a new column
-    List<FieldSchema> tupSchema = bagFieldSchema.schema.getFields();
-    if (hcatFieldSchema == null && tupSchema.size() == 1 && (tupSchema.get(0).schema == null || (tupSchema.get(0).type == DataType.TUPLE && tupSchema.get(0).schema.size() == 1))) {
-      return true;
-    }
-    return false;
-  }
-
-
-  private HCatFieldSchema getHCatFSFromPigFS(FieldSchema fSchema, HCatFieldSchema hcatFieldSchema) throws FrontendException, HCatException{
-    byte type = fSchema.type;
-    switch(type){
-
-    case DataType.CHARARRAY:
-    case DataType.BIGCHARARRAY:
-      return new HCatFieldSchema(fSchema.alias, Type.STRING, null);
-
-    case DataType.INTEGER:
-      if (hcatFieldSchema != null) {
-        if (!SUPPORTED_INTEGER_CONVERSIONS.contains(hcatFieldSchema.getType())) {
-          throw new FrontendException("Unsupported type: " + type + "  in Pig's schema",
-            PigHCatUtil.PIG_EXCEPTION_CODE);
-        }
-        return new HCatFieldSchema(fSchema.alias, hcatFieldSchema.getType(), null);
-      } else {
-        return new HCatFieldSchema(fSchema.alias, Type.INT, null);
-      }
-
-    case DataType.LONG:
-      return new HCatFieldSchema(fSchema.alias, Type.BIGINT, null);
-
-    case DataType.FLOAT:
-      return new HCatFieldSchema(fSchema.alias, Type.FLOAT, null);
-
-    case DataType.DOUBLE:
-      return new HCatFieldSchema(fSchema.alias, Type.DOUBLE, null);
-
-    case DataType.BYTEARRAY:
-      return new HCatFieldSchema(fSchema.alias, Type.BINARY, null);
-
-    case DataType.BAG:
-      Schema bagSchema = fSchema.schema;
-      List<HCatFieldSchema> arrFields = new ArrayList<HCatFieldSchema>(1);
-      FieldSchema field;
-      // Find out if we need to throw away the tuple or not.
-      if (removeTupleFromBag(hcatFieldSchema, fSchema)) {
-        field = bagSchema.getField(0).schema.getField(0);
-      } else {
-        field = bagSchema.getField(0);
-      }
-      arrFields.add(getHCatFSFromPigFS(field, hcatFieldSchema == null ? null : hcatFieldSchema.getArrayElementSchema().get(0)));
-      return new HCatFieldSchema(fSchema.alias, Type.ARRAY, new HCatSchema(arrFields), "");
-
-    case DataType.TUPLE:
-      List<String> fieldNames = new ArrayList<String>();
-      List<HCatFieldSchema> hcatFSs = new ArrayList<HCatFieldSchema>();
-      HCatSchema structSubSchema = hcatFieldSchema == null ? null : hcatFieldSchema.getStructSubSchema();
-      List<FieldSchema> fields = fSchema.schema.getFields();
-      for (int i = 0; i < fields.size(); i++) {
-        FieldSchema fieldSchema = fields.get(i);
-        fieldNames.add(fieldSchema.alias);
-        hcatFSs.add(getHCatFSFromPigFS(fieldSchema, structSubSchema == null ? null : structSubSchema.get(i)));
-      }
-      return new HCatFieldSchema(fSchema.alias, Type.STRUCT, new HCatSchema(hcatFSs), "");
-
-    case DataType.MAP:{
-      // Pig's schema contain no type information about map's keys and
-      // values. So, if its a new column assume <string,string> if its existing
-      // return whatever is contained in the existing column.
-
-      HCatFieldSchema valFS;
-      List<HCatFieldSchema> valFSList = new ArrayList<HCatFieldSchema>(1);
-
-      if(hcatFieldSchema != null){
-        return new HCatFieldSchema(fSchema.alias, Type.MAP, Type.STRING, hcatFieldSchema.getMapValueSchema(), "");
-      }
-
-      // Column not found in target table. Its a new column. Its schema is map<string,string>
-      valFS = new HCatFieldSchema(fSchema.alias, Type.STRING, "");
-      valFSList.add(valFS);
-      return new HCatFieldSchema(fSchema.alias,Type.MAP,Type.STRING, new HCatSchema(valFSList),"");
-     }
-
-    default:
-      throw new FrontendException("Unsupported type: "+type+"  in Pig's schema", PigHCatUtil.PIG_EXCEPTION_CODE);
-    }
-  }
-
-  @Override
-  public void prepareToWrite(RecordWriter writer) throws IOException {
-    this.writer = writer;
-    computedSchema = (HCatSchema)ObjectSerializer.deserialize(UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[]{sign}).getProperty(COMPUTED_OUTPUT_SCHEMA));
-  }
-
-  @Override
-  public void putNext(Tuple tuple) throws IOException {
-
-    List<Object> outgoing = new ArrayList<Object>(tuple.size());
-
-    int i = 0;
-    for(HCatFieldSchema fSchema : computedSchema.getFields()){
-      outgoing.add(getJavaObj(tuple.get(i++), fSchema));
-    }
-    try {
-      writer.write(null, new DefaultHCatRecord(outgoing));
-    } catch (InterruptedException e) {
-      throw new BackendException("Error while writing tuple: "+tuple, PigHCatUtil.PIG_EXCEPTION_CODE, e);
-    }
-  }
-
-  private Object getJavaObj(Object pigObj, HCatFieldSchema hcatFS) throws HCatException, BackendException{
-    try {
-
-      // The real work-horse. Spend time and energy in this method if there is
-      // need to keep HCatStorer lean and go fast.
-      Type type = hcatFS.getType();
-      switch(type){
-
-      case BINARY:
-        if (pigObj == null) {
-          return null;
-        }          
-        return ((DataByteArray)pigObj).get();
-
-      case STRUCT:
-        if (pigObj == null) {
-          return null;
-        }
-        HCatSchema structSubSchema = hcatFS.getStructSubSchema();
-        // Unwrap the tuple.
-        List<Object> all = ((Tuple)pigObj).getAll();
-        ArrayList<Object> converted = new ArrayList<Object>(all.size());
-        for (int i = 0; i < all.size(); i++) {
-          converted.add(getJavaObj(all.get(i), structSubSchema.get(i)));
-        }
-        return converted;
-
-      case ARRAY:
-        if (pigObj == null) {
-          return null;
-        }
-        // Unwrap the bag.
-        DataBag pigBag = (DataBag)pigObj;
-        HCatFieldSchema tupFS = hcatFS.getArrayElementSchema().get(0);
-        boolean needTuple = tupFS.getType() == Type.STRUCT;
-        List<Object> bagContents = new ArrayList<Object>((int)pigBag.size());
-        Iterator<Tuple> bagItr = pigBag.iterator();
-
-        while(bagItr.hasNext()){
-          // If there is only one element in tuple contained in bag, we throw away the tuple.
-          bagContents.add(getJavaObj(needTuple ? bagItr.next() : bagItr.next().get(0), tupFS));
-
-        }
-        return bagContents;
-      case MAP:
-        if (pigObj == null) {
-          return null;
-        }
-        Map<?,?> pigMap = (Map<?,?>)pigObj;
-        Map<Object,Object> typeMap = new HashMap<Object, Object>();
-        for(Entry<?, ?> entry: pigMap.entrySet()){
-          // the value has a schema and not a FieldSchema
-          typeMap.put(
-              // Schema validation enforces that the Key is a String
-              (String)entry.getKey(),
-              getJavaObj(entry.getValue(), hcatFS.getMapValueSchema().get(0)));
-        }
-        return typeMap;
-      case STRING:
-      case INT:
-      case BIGINT:
-      case FLOAT:
-      case DOUBLE:
-        return pigObj;
-      case SMALLINT:
-        if (pigObj == null) {
-          return null;
-        }
-        if ((Integer) pigObj < Short.MIN_VALUE || (Integer) pigObj > Short.MAX_VALUE) {
-          throw new BackendException("Value " + pigObj + " is outside the bounds of column " +
-              hcatFS.getName() + " with type " + hcatFS.getType(), PigHCatUtil.PIG_EXCEPTION_CODE);
-        }
-        return ((Integer) pigObj).shortValue();
-      case TINYINT:
-        if (pigObj == null) {
-          return null;
-        }
-        if ((Integer) pigObj < Byte.MIN_VALUE || (Integer) pigObj > Byte.MAX_VALUE) {
-          throw new BackendException("Value " + pigObj + " is outside the bounds of column " +
-              hcatFS.getName() + " with type " + hcatFS.getType(), PigHCatUtil.PIG_EXCEPTION_CODE);
-        }
-        return ((Integer) pigObj).byteValue();
-      case BOOLEAN:
-        // would not pass schema validation anyway
-        throw new BackendException("Incompatible type "+type+" found in hcat table schema: "+hcatFS, PigHCatUtil.PIG_EXCEPTION_CODE);
-      default:
-        throw new BackendException("Unexpected type "+type+" for value "+pigObj + (pigObj == null ? "" : " of class " + pigObj.getClass().getName()), PigHCatUtil.PIG_EXCEPTION_CODE);
-      }
-    } catch (BackendException e) {
-      // provide the path to the field in the error message
-      throw new BackendException(
-          (hcatFS.getName() == null ? " " : hcatFS.getName()+".") + e.getMessage(),
-          e.getCause() == null ? e : e.getCause());
-    }
-  }
-
-  @Override
-  public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException {
-
-    // Need to necessarily override this method since default impl assumes HDFS
-    // based location string.
-    return location;
-  }
-
-  @Override
-  public void setStoreFuncUDFContextSignature(String signature) {
-    sign = signature;
-  }
-
-
-  protected void doSchemaValidations(Schema pigSchema, HCatSchema tblSchema) throws FrontendException, HCatException{
-
-    // Iterate through all the elements in Pig Schema and do validations as
-    // dictated by semantics, consult HCatSchema of table when need be.
-
-    for(FieldSchema pigField : pigSchema.getFields()){
-      HCatFieldSchema hcatField = getColFromSchema(pigField.alias, tblSchema);
-      validateSchema(pigField, hcatField);
-    }
-
-    try {
-      PigHCatUtil.validateHCatTableSchemaFollowsPigRules(tblSchema);
-    } catch (IOException e) {
-      throw new FrontendException("HCatalog schema is not compatible with Pig: "+e.getMessage(),  PigHCatUtil.PIG_EXCEPTION_CODE, e);
-    }
-  }
-
-
-  private void validateSchema(FieldSchema pigField, HCatFieldSchema hcatField)
-      throws HCatException, FrontendException {
-    validateAlias(pigField.alias);
-    byte type = pigField.type;
-    if(DataType.isComplex(type)){
-      switch(type){
-
-      case DataType.MAP:
-        if(hcatField != null){
-          if(hcatField.getMapKeyType() != Type.STRING){
-            throw new FrontendException("Key Type of map must be String "+hcatField,  PigHCatUtil.PIG_EXCEPTION_CODE);
-          }
-          // Map values can be primitive or complex
-        }
-        break;
-
-      case DataType.BAG:
-        HCatSchema arrayElementSchema = hcatField == null ? null : hcatField.getArrayElementSchema();
-        for(FieldSchema innerField : pigField.schema.getField(0).schema.getFields()){
-          validateSchema(innerField, getColFromSchema(pigField.alias, arrayElementSchema));
-        }
-        break;
-
-      case DataType.TUPLE:
-        HCatSchema structSubSchema = hcatField == null ? null : hcatField.getStructSubSchema();
-        for(FieldSchema innerField : pigField.schema.getFields()){
-          validateSchema(innerField, getColFromSchema(pigField.alias, structSubSchema));
-        }
-        break;
-
-      default:
-        throw new FrontendException("Internal Error.", PigHCatUtil.PIG_EXCEPTION_CODE);
-      }
-    }
-  }
-
-  private void validateAlias(String alias) throws FrontendException{
-    if(alias == null) {
-      throw new FrontendException("Column name for a field is not specified. Please provide the full schema as an argument to HCatStorer.", PigHCatUtil.PIG_EXCEPTION_CODE);
-    }
-    if(alias.matches(".*[A-Z]+.*")) {
-      throw new FrontendException("Column names should all be in lowercase. Invalid name found: "+alias, PigHCatUtil.PIG_EXCEPTION_CODE);
-    }
-  }
-
-  // Finds column by name in HCatSchema, if not found returns null.
-  private HCatFieldSchema getColFromSchema(String alias, HCatSchema tblSchema){
-    if (tblSchema != null) {
-      for(HCatFieldSchema hcatField : tblSchema.getFields()){
-        if(hcatField!=null && hcatField.getName()!= null && hcatField.getName().equalsIgnoreCase(alias)){
-          return hcatField;
-        }
-      }
-    }
-    // Its a new column
-    return null;
-  }
-
-  @Override
-  public void cleanupOnFailure(String location, Job job) throws IOException {
-    // No-op.
-  }
-
-  @Override
-  public void storeStatistics(ResourceStatistics stats, String arg1, Job job) throws IOException {
-  }
+    protected HCatSchema convertPigSchemaToHCatSchema(Schema pigSchema, HCatSchema tableSchema) throws FrontendException {
+        List<HCatFieldSchema> fieldSchemas = new ArrayList<HCatFieldSchema>(pigSchema.size());
+        for (FieldSchema fSchema : pigSchema.getFields()) {
+            try {
+                HCatFieldSchema hcatFieldSchema = getColFromSchema(fSchema.alias, tableSchema);
+
+                fieldSchemas.add(getHCatFSFromPigFS(fSchema, hcatFieldSchema));
+            } catch (HCatException he) {
+                throw new FrontendException(he.getMessage(), PigHCatUtil.PIG_EXCEPTION_CODE, he);
+            }
+        }
+        return new HCatSchema(fieldSchemas);
+    }
+
+    public static boolean removeTupleFromBag(HCatFieldSchema hcatFieldSchema, FieldSchema bagFieldSchema) throws HCatException {
+        if (hcatFieldSchema != null && hcatFieldSchema.getArrayElementSchema().get(0).getType() != Type.STRUCT) {
+            return true;
+        }
+        // Column was not found in table schema. Its a new column
+        List<FieldSchema> tupSchema = bagFieldSchema.schema.getFields();
+        if (hcatFieldSchema == null && tupSchema.size() == 1 && (tupSchema.get(0).schema == null || (tupSchema.get(0).type == DataType.TUPLE && tupSchema.get(0).schema.size() == 1))) {
+            return true;
+        }
+        return false;
+    }
+
+
+    private HCatFieldSchema getHCatFSFromPigFS(FieldSchema fSchema, HCatFieldSchema hcatFieldSchema) throws FrontendException, HCatException {
+        byte type = fSchema.type;
+        switch (type) {
+
+        case DataType.CHARARRAY:
+        case DataType.BIGCHARARRAY:
+            return new HCatFieldSchema(fSchema.alias, Type.STRING, null);
+
+        case DataType.INTEGER:
+            if (hcatFieldSchema != null) {
+                if (!SUPPORTED_INTEGER_CONVERSIONS.contains(hcatFieldSchema.getType())) {
+                    throw new FrontendException("Unsupported type: " + type + "  in Pig's schema",
+                        PigHCatUtil.PIG_EXCEPTION_CODE);
+                }
+                return new HCatFieldSchema(fSchema.alias, hcatFieldSchema.getType(), null);
+            } else {
+                return new HCatFieldSchema(fSchema.alias, Type.INT, null);
+            }
+
+        case DataType.LONG:
+            return new HCatFieldSchema(fSchema.alias, Type.BIGINT, null);
+
+        case DataType.FLOAT:
+            return new HCatFieldSchema(fSchema.alias, Type.FLOAT, null);
+
+        case DataType.DOUBLE:
+            return new HCatFieldSchema(fSchema.alias, Type.DOUBLE, null);
+
+        case DataType.BYTEARRAY:
+            return new HCatFieldSchema(fSchema.alias, Type.BINARY, null);
+
+        case DataType.BAG:
+            Schema bagSchema = fSchema.schema;
+            List<HCatFieldSchema> arrFields = new ArrayList<HCatFieldSchema>(1);
+            FieldSchema field;
+            // Find out if we need to throw away the tuple or not.
+            if (removeTupleFromBag(hcatFieldSchema, fSchema)) {
+                field = bagSchema.getField(0).schema.getField(0);
+            } else {
+                field = bagSchema.getField(0);
+            }
+            arrFields.add(getHCatFSFromPigFS(field, hcatFieldSchema == null ? null : hcatFieldSchema.getArrayElementSchema().get(0)));
+            return new HCatFieldSchema(fSchema.alias, Type.ARRAY, new HCatSchema(arrFields), "");
+
+        case DataType.TUPLE:
+            List<String> fieldNames = new ArrayList<String>();
+            List<HCatFieldSchema> hcatFSs = new ArrayList<HCatFieldSchema>();
+            HCatSchema structSubSchema = hcatFieldSchema == null ? null : hcatFieldSchema.getStructSubSchema();
+            List<FieldSchema> fields = fSchema.schema.getFields();
+            for (int i = 0; i < fields.size(); i++) {
+                FieldSchema fieldSchema = fields.get(i);
+                fieldNames.add(fieldSchema.alias);
+                hcatFSs.add(getHCatFSFromPigFS(fieldSchema, structSubSchema == null ? null : structSubSchema.get(i)));
+            }
+            return new HCatFieldSchema(fSchema.alias, Type.STRUCT, new HCatSchema(hcatFSs), "");
+
+        case DataType.MAP: {
+            // Pig's schema contain no type information about map's keys and
+            // values. So, if its a new column assume <string,string> if its existing
+            // return whatever is contained in the existing column.
+
+            HCatFieldSchema valFS;
+            List<HCatFieldSchema> valFSList = new ArrayList<HCatFieldSchema>(1);
+
+            if (hcatFieldSchema != null) {
+                return new HCatFieldSchema(fSchema.alias, Type.MAP, Type.STRING, hcatFieldSchema.getMapValueSchema(), "");
+            }
+
+            // Column not found in target table. Its a new column. Its schema is map<string,string>
+            valFS = new HCatFieldSchema(fSchema.alias, Type.STRING, "");
+            valFSList.add(valFS);
+            return new HCatFieldSchema(fSchema.alias, Type.MAP, Type.STRING, new HCatSchema(valFSList), "");
+        }
+
+        default:
+            throw new FrontendException("Unsupported type: " + type + "  in Pig's schema", PigHCatUtil.PIG_EXCEPTION_CODE);
+        }
+    }
+
+    @Override
+    public void prepareToWrite(RecordWriter writer) throws IOException {
+        this.writer = writer;
+        computedSchema = (HCatSchema) ObjectSerializer.deserialize(UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[]{sign}).getProperty(COMPUTED_OUTPUT_SCHEMA));
+    }
+
+    @Override
+    public void putNext(Tuple tuple) throws IOException {
+
+        List<Object> outgoing = new ArrayList<Object>(tuple.size());
+
+        int i = 0;
+        for (HCatFieldSchema fSchema : computedSchema.getFields()) {
+            outgoing.add(getJavaObj(tuple.get(i++), fSchema));
+        }
+        try {
+            writer.write(null, new DefaultHCatRecord(outgoing));
+        } catch (InterruptedException e) {
+            throw new BackendException("Error while writing tuple: " + tuple, PigHCatUtil.PIG_EXCEPTION_CODE, e);
+        }
+    }
+
+    private Object getJavaObj(Object pigObj, HCatFieldSchema hcatFS) throws HCatException, BackendException {
+        try {
+
+            // The real work-horse. Spend time and energy in this method if there is
+            // need to keep HCatStorer lean and go fast.
+            Type type = hcatFS.getType();
+            switch (type) {
+
+            case BINARY:
+                if (pigObj == null) {
+                    return null;
+                }
+                return ((DataByteArray) pigObj).get();
+
+            case STRUCT:
+                if (pigObj == null) {
+                    return null;
+                }
+                HCatSchema structSubSchema = hcatFS.getStructSubSchema();
+                // Unwrap the tuple.
+                List<Object> all = ((Tuple) pigObj).getAll();
+                ArrayList<Object> converted = new ArrayList<Object>(all.size());
+                for (int i = 0; i < all.size(); i++) {
+                    converted.add(getJavaObj(all.get(i), structSubSchema.get(i)));
+                }
+                return converted;
+
+            case ARRAY:
+                if (pigObj == null) {
+                    return null;
+                }
+                // Unwrap the bag.
+                DataBag pigBag = (DataBag) pigObj;
+                HCatFieldSchema tupFS = hcatFS.getArrayElementSchema().get(0);
+                boolean needTuple = tupFS.getType() == Type.STRUCT;
+                List<Object> bagContents = new ArrayList<Object>((int) pigBag.size());
+                Iterator<Tuple> bagItr = pigBag.iterator();
+
+                while (bagItr.hasNext()) {
+                    // If there is only one element in tuple contained in bag, we throw away the tuple.
+                    bagContents.add(getJavaObj(needTuple ? bagItr.next() : bagItr.next().get(0), tupFS));
+
+                }
+                return bagContents;
+            case MAP:
+                if (pigObj == null) {
+                    return null;
+                }
+                Map<?, ?> pigMap = (Map<?, ?>) pigObj;
+                Map<Object, Object> typeMap = new HashMap<Object, Object>();
+                for (Entry<?, ?> entry : pigMap.entrySet()) {
+                    // the value has a schema and not a FieldSchema
+                    typeMap.put(
+                        // Schema validation enforces that the Key is a String
+                        (String) entry.getKey(),
+                        getJavaObj(entry.getValue(), hcatFS.getMapValueSchema().get(0)));
+                }
+                return typeMap;
+            case STRING:
+            case INT:
+            case BIGINT:
+            case FLOAT:
+            case DOUBLE:
+                return pigObj;
+            case SMALLINT:
+                if (pigObj == null) {
+                    return null;
+                }
+                if ((Integer) pigObj < Short.MIN_VALUE || (Integer) pigObj > Short.MAX_VALUE) {
+                    throw new BackendException("Value " + pigObj + " is outside the bounds of column " +
+                        hcatFS.getName() + " with type " + hcatFS.getType(), PigHCatUtil.PIG_EXCEPTION_CODE);
+                }
+                return ((Integer) pigObj).shortValue();
+            case TINYINT:
+                if (pigObj == null) {
+                    return null;
+                }
+                if ((Integer) pigObj < Byte.MIN_VALUE || (Integer) pigObj > Byte.MAX_VALUE) {
+                    throw new BackendException("Value " + pigObj + " is outside the bounds of column " +
+                        hcatFS.getName() + " with type " + hcatFS.getType(), PigHCatUtil.PIG_EXCEPTION_CODE);
+                }
+                return ((Integer) pigObj).byteValue();
+            case BOOLEAN:
+                // would not pass schema validation anyway
+                throw new BackendException("Incompatible type " + type + " found in hcat table schema: " + hcatFS, PigHCatUtil.PIG_EXCEPTION_CODE);
+            default:
+                throw new BackendException("Unexpected type " + type + " for value " + pigObj + (pigObj == null ? "" : " of class " + pigObj.getClass().getName()), PigHCatUtil.PIG_EXCEPTION_CODE);
+            }
+        } catch (BackendException e) {
+            // provide the path to the field in the error message
+            throw new BackendException(
+                (hcatFS.getName() == null ? " " : hcatFS.getName() + ".") + e.getMessage(),
+                e.getCause() == null ? e : e.getCause());
+        }
+    }
+
+    @Override
+    public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException {
+
+        // Need to necessarily override this method since default impl assumes HDFS
+        // based location string.
+        return location;
+    }
+
+    @Override
+    public void setStoreFuncUDFContextSignature(String signature) {
+        sign = signature;
+    }
+
+
+    protected void doSchemaValidations(Schema pigSchema, HCatSchema tblSchema) throws FrontendException, HCatException {
+
+        // Iterate through all the elements in Pig Schema and do validations as
+        // dictated by semantics, consult HCatSchema of table when need be.
+
+        for (FieldSchema pigField : pigSchema.getFields()) {
+            HCatFieldSchema hcatField = getColFromSchema(pigField.alias, tblSchema);
+            validateSchema(pigField, hcatField);
+        }
+
+        try {
+            PigHCatUtil.validateHCatTableSchemaFollowsPigRules(tblSchema);
+        } catch (IOException e) {
+            throw new FrontendException("HCatalog schema is not compatible with Pig: " + e.getMessage(), PigHCatUtil.PIG_EXCEPTION_CODE, e);
+        }
+    }
+
+
+    private void validateSchema(FieldSchema pigField, HCatFieldSchema hcatField)
+        throws HCatException, FrontendException {
+        validateAlias(pigField.alias);
+        byte type = pigField.type;
+        if (DataType.isComplex(type)) {
+            switch (type) {
+
+            case DataType.MAP:
+                if (hcatField != null) {
+                    if (hcatField.getMapKeyType() != Type.STRING) {
+                        throw new FrontendException("Key Type of map must be String " + hcatField, PigHCatUtil.PIG_EXCEPTION_CODE);
+                    }
+                    // Map values can be primitive or complex
+                }
+                break;
+
+            case DataType.BAG:
+                HCatSchema arrayElementSchema = hcatField == null ? null : hcatField.getArrayElementSchema();
+                for (FieldSchema innerField : pigField.schema.getField(0).schema.getFields()) {
+                    validateSchema(innerField, getColFromSchema(pigField.alias, arrayElementSchema));
+                }
+                break;
+
+            case DataType.TUPLE:
+                HCatSchema structSubSchema = hcatField == null ? null : hcatField.getStructSubSchema();
+                for (FieldSchema innerField : pigField.schema.getFields()) {
+                    validateSchema(innerField, getColFromSchema(pigField.alias, structSubSchema));
+                }
+                break;
+
+            default:
+                throw new FrontendException("Internal Error.", PigHCatUtil.PIG_EXCEPTION_CODE);
+            }
+        }
+    }
+
+    private void validateAlias(String alias) throws FrontendException {
+        if (alias == null) {
+            throw new FrontendException("Column name for a field is not specified. Please provide the full schema as an argument to HCatStorer.", PigHCatUtil.PIG_EXCEPTION_CODE);
+        }
+        if (alias.matches(".*[A-Z]+.*")) {
+            throw new FrontendException("Column names should all be in lowercase. Invalid name found: " + alias, PigHCatUtil.PIG_EXCEPTION_CODE);
+        }
+    }
+
+    // Finds column by name in HCatSchema, if not found returns null.
+    private HCatFieldSchema getColFromSchema(String alias, HCatSchema tblSchema) {
+        if (tblSchema != null) {
+            for (HCatFieldSchema hcatField : tblSchema.getFields()) {
+                if (hcatField != null && hcatField.getName() != null && hcatField.getName().equalsIgnoreCase(alias)) {
+                    return hcatField;
+                }
+            }
+        }
+        // Its a new column
+        return null;
+    }
+
+    @Override
+    public void cleanupOnFailure(String location, Job job) throws IOException {
+        // No-op.
+    }
+
+    @Override
+    public void storeStatistics(ResourceStatistics stats, String arg1, Job job) throws IOException {
+    }
 }

Modified: incubator/hcatalog/trunk/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatLoader.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatLoader.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatLoader.java (original)
+++ incubator/hcatalog/trunk/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatLoader.java Mon Sep 10 23:28:55 2012
@@ -51,59 +51,59 @@ import org.apache.pig.impl.util.UDFConte
 
 public class HCatLoader extends HCatBaseLoader {
 
-  private static final String PARTITION_FILTER = "partition.filter"; // for future use
+    private static final String PARTITION_FILTER = "partition.filter"; // for future use
 
-  private HCatInputFormat hcatInputFormat = null;
-  private String dbName;
-  private String tableName;
-  private String hcatServerUri;
-  private String partitionFilterString;
-  private final PigHCatUtil phutil = new PigHCatUtil();
-
-  // Signature for wrapped loader, see comments in LoadFuncBasedInputDriver.initialize
-  final public static String INNER_SIGNATURE = "hcatloader.inner.signature";
-  final public static String INNER_SIGNATURE_PREFIX = "hcatloader_inner_signature";
-  // A hash map which stores job credentials. The key is a signature passed by Pig, which is
-  //unique to the load func and input file name (table, in our case).
-  private static Map<String, Credentials> jobCredentials = new HashMap<String, Credentials>();
-
-  @Override
-  public InputFormat<?,?> getInputFormat() throws IOException {
-    if(hcatInputFormat == null) {
-      hcatInputFormat = new HCatInputFormat();
-    }
-    return hcatInputFormat;
-  }
-
-  @Override
-  public String relativeToAbsolutePath(String location, Path curDir) throws IOException {
-    return location;
-  }
-
-@Override
-  public void setLocation(String location, Job job) throws IOException {
-
-    UDFContext udfContext = UDFContext.getUDFContext();
-    Properties udfProps = udfContext.getUDFProperties(this.getClass(),
-        new String[]{signature});
-    job.getConfiguration().set(INNER_SIGNATURE, INNER_SIGNATURE_PREFIX + "_" + signature);
-    Pair<String, String> dbTablePair = PigHCatUtil.getDBTableNames(location);
-    dbName = dbTablePair.first;
-    tableName = dbTablePair.second;
-
-    RequiredFieldList requiredFieldsInfo = (RequiredFieldList) udfProps
-    .get(PRUNE_PROJECTION_INFO);
-    // get partitionFilterString stored in the UDFContext - it would have
-    // been stored there by an earlier call to setPartitionFilter
-    // call setInput on HCatInputFormat only in the frontend because internally
-    // it makes calls to the hcat server - we don't want these to happen in
-    // the backend
-    // in the hadoop front end mapred.task.id property will not be set in
-    // the Configuration
+    private HCatInputFormat hcatInputFormat = null;
+    private String dbName;
+    private String tableName;
+    private String hcatServerUri;
+    private String partitionFilterString;
+    private final PigHCatUtil phutil = new PigHCatUtil();
+
+    // Signature for wrapped loader, see comments in LoadFuncBasedInputDriver.initialize
+    final public static String INNER_SIGNATURE = "hcatloader.inner.signature";
+    final public static String INNER_SIGNATURE_PREFIX = "hcatloader_inner_signature";
+    // A hash map which stores job credentials. The key is a signature passed by Pig, which is
+    //unique to the load func and input file name (table, in our case).
+    private static Map<String, Credentials> jobCredentials = new HashMap<String, Credentials>();
+
+    @Override
+    public InputFormat<?, ?> getInputFormat() throws IOException {
+        if (hcatInputFormat == null) {
+            hcatInputFormat = new HCatInputFormat();
+        }
+        return hcatInputFormat;
+    }
+
+    @Override
+    public String relativeToAbsolutePath(String location, Path curDir) throws IOException {
+        return location;
+    }
+
+    @Override
+    public void setLocation(String location, Job job) throws IOException {
+
+        UDFContext udfContext = UDFContext.getUDFContext();
+        Properties udfProps = udfContext.getUDFProperties(this.getClass(),
+            new String[]{signature});
+        job.getConfiguration().set(INNER_SIGNATURE, INNER_SIGNATURE_PREFIX + "_" + signature);
+        Pair<String, String> dbTablePair = PigHCatUtil.getDBTableNames(location);
+        dbName = dbTablePair.first;
+        tableName = dbTablePair.second;
+
+        RequiredFieldList requiredFieldsInfo = (RequiredFieldList) udfProps
+            .get(PRUNE_PROJECTION_INFO);
+        // get partitionFilterString stored in the UDFContext - it would have
+        // been stored there by an earlier call to setPartitionFilter
+        // call setInput on HCatInputFormat only in the frontend because internally
+        // it makes calls to the hcat server - we don't want these to happen in
+        // the backend
+        // in the hadoop front end mapred.task.id property will not be set in
+        // the Configuration
         if (udfProps.containsKey(HCatConstants.HCAT_PIG_LOADER_LOCATION_SET)) {
-            for( Enumeration<Object> emr = udfProps.keys();emr.hasMoreElements();) {
+            for (Enumeration<Object> emr = udfProps.keys(); emr.hasMoreElements(); ) {
                 PigHCatUtil.getConfigFromUDFProperties(udfProps,
-                            job.getConfiguration(), emr.nextElement().toString());
+                    job.getConfiguration(), emr.nextElement().toString());
             }
             if (!HCatUtil.checkJobContextIfRunningFromBackend(job)) {
                 //Combine credentials and credentials from job takes precedence for freshness
@@ -114,12 +114,12 @@ public class HCatLoader extends HCatBase
         } else {
             Job clone = new Job(job.getConfiguration());
             HCatInputFormat.setInput(job, InputJobInfo.create(dbName,
-                    tableName, getPartitionFilterString()));
+                tableName, getPartitionFilterString()));
 
             // We will store all the new /changed properties in the job in the
             // udf context, so the the HCatInputFormat.setInput method need not
             //be called many times.
-            for (Entry<String,String> keyValue : job.getConfiguration()) {
+            for (Entry<String, String> keyValue : job.getConfiguration()) {
                 String oldValue = clone.getConfiguration().getRaw(keyValue.getKey());
                 if ((oldValue == null) || (keyValue.getValue().equals(oldValue) == false)) {
                     udfProps.put(keyValue.getKey(), keyValue.getValue());
@@ -144,129 +144,129 @@ public class HCatLoader extends HCatBase
         // here will ensure we communicate to HCatInputFormat about pruned
         // projections at getSplits() and createRecordReader() time
 
-        if(requiredFieldsInfo != null) {
-          // convert to hcatschema and pass to HCatInputFormat
-          try {
-            outputSchema = phutil.getHCatSchema(requiredFieldsInfo.getFields(),signature,this.getClass());
-            HCatInputFormat.setOutputSchema(job, outputSchema);
-          } catch (Exception e) {
-            throw new IOException(e);
-          }
-        } else{
-          // else - this means pig's optimizer never invoked the pushProjection
-          // method - so we need all fields and hence we should not call the
-          // setOutputSchema on HCatInputFormat
-          if (HCatUtil.checkJobContextIfRunningFromBackend(job)){
+        if (requiredFieldsInfo != null) {
+            // convert to hcatschema and pass to HCatInputFormat
             try {
-              HCatSchema hcatTableSchema = (HCatSchema) udfProps.get(HCatConstants.HCAT_TABLE_SCHEMA);
-              outputSchema = hcatTableSchema;
-              HCatInputFormat.setOutputSchema(job, outputSchema);
+                outputSchema = phutil.getHCatSchema(requiredFieldsInfo.getFields(), signature, this.getClass());
+                HCatInputFormat.setOutputSchema(job, outputSchema);
             } catch (Exception e) {
-              throw new IOException(e);
+                throw new IOException(e);
+            }
+        } else {
+            // else - this means pig's optimizer never invoked the pushProjection
+            // method - so we need all fields and hence we should not call the
+            // setOutputSchema on HCatInputFormat
+            if (HCatUtil.checkJobContextIfRunningFromBackend(job)) {
+                try {
+                    HCatSchema hcatTableSchema = (HCatSchema) udfProps.get(HCatConstants.HCAT_TABLE_SCHEMA);
+                    outputSchema = hcatTableSchema;
+                    HCatInputFormat.setOutputSchema(job, outputSchema);
+                } catch (Exception e) {
+                    throw new IOException(e);
+                }
             }
-          }
         }
 
-  }
+    }
 
-  @Override
-  public String[] getPartitionKeys(String location, Job job)
-  throws IOException {
-    Table table = phutil.getTable(location,
-        hcatServerUri!=null?hcatServerUri:PigHCatUtil.getHCatServerUri(job),
+    @Override
+    public String[] getPartitionKeys(String location, Job job)
+        throws IOException {
+        Table table = phutil.getTable(location,
+            hcatServerUri != null ? hcatServerUri : PigHCatUtil.getHCatServerUri(job),
             PigHCatUtil.getHCatServerPrincipal(job));
-    List<FieldSchema> tablePartitionKeys = table.getPartitionKeys();
-    String[] partitionKeys = new String[tablePartitionKeys.size()];
-    for(int i = 0; i < tablePartitionKeys.size(); i++) {
-      partitionKeys[i] = tablePartitionKeys.get(i).getName();
-    }
-    return partitionKeys;
-  }
-
-  @Override
-  public ResourceSchema getSchema(String location, Job job) throws IOException {
-    HCatContext.getInstance().mergeConf(job.getConfiguration());
-    HCatContext.getInstance().getConf().setBoolean(
-        HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION, true);
+        List<FieldSchema> tablePartitionKeys = table.getPartitionKeys();
+        String[] partitionKeys = new String[tablePartitionKeys.size()];
+        for (int i = 0; i < tablePartitionKeys.size(); i++) {
+            partitionKeys[i] = tablePartitionKeys.get(i).getName();
+        }
+        return partitionKeys;
+    }
+
+    @Override
+    public ResourceSchema getSchema(String location, Job job) throws IOException {
+        HCatContext.getInstance().mergeConf(job.getConfiguration());
+        HCatContext.getInstance().getConf().setBoolean(
+            HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION, true);
 
-    Table table = phutil.getTable(location,
-        hcatServerUri!=null?hcatServerUri:PigHCatUtil.getHCatServerUri(job),
+        Table table = phutil.getTable(location,
+            hcatServerUri != null ? hcatServerUri : PigHCatUtil.getHCatServerUri(job),
             PigHCatUtil.getHCatServerPrincipal(job));
-    HCatSchema hcatTableSchema = HCatUtil.getTableSchemaWithPtnCols(table);
-    try {
-      PigHCatUtil.validateHCatTableSchemaFollowsPigRules(hcatTableSchema);
-    } catch (IOException e){
-      throw new PigException(
-          "Table schema incompatible for reading through HCatLoader :" + e.getMessage()
-          + ";[Table schema was "+ hcatTableSchema.toString() +"]"
-          ,PigHCatUtil.PIG_EXCEPTION_CODE, e);
-    }
-    storeInUDFContext(signature, HCatConstants.HCAT_TABLE_SCHEMA, hcatTableSchema);
-    outputSchema = hcatTableSchema;
-    return PigHCatUtil.getResourceSchema(hcatTableSchema);
-  }
-
-  @Override
-  public void setPartitionFilter(Expression partitionFilter) throws IOException {
-    // convert the partition filter expression into a string expected by
-    // hcat and pass it in setLocation()
-
-    partitionFilterString = getHCatComparisonString(partitionFilter);
-
-    // store this in the udf context so we can get it later
-    storeInUDFContext(signature,
-        PARTITION_FILTER, partitionFilterString);
-  }
-
-  /**
-   * Get statistics about the data to be loaded. Only input data size is implemented at this time.
-   */
-  @Override
-  public ResourceStatistics getStatistics(String location, Job job) throws IOException {
-    try {
-      ResourceStatistics stats = new ResourceStatistics();
-      InputJobInfo inputJobInfo = (InputJobInfo) HCatUtil.deserialize(
-          job.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO));
-      stats.setmBytes(getSizeInBytes(inputJobInfo) / 1024 / 1024);
-      return stats;
-    } catch (Exception e) {
-      throw new IOException(e);
-    }
-  }
-
-  private String getPartitionFilterString() {
-    if(partitionFilterString == null) {
-      Properties props = UDFContext.getUDFContext().getUDFProperties(
-          this.getClass(), new String[] {signature});
-      partitionFilterString = props.getProperty(PARTITION_FILTER);
-    }
-    return partitionFilterString;
-  }
-
-  private String getHCatComparisonString(Expression expr) {
-    if(expr instanceof BinaryExpression){
-      // call getHCatComparisonString on lhs and rhs, and and join the
-      // results with OpType string
-
-      // we can just use OpType.toString() on all Expression types except
-      // Equal, NotEqualt since Equal has '==' in toString() and
-      // we need '='
-      String opStr = null;
-      switch(expr.getOpType()){
-        case OP_EQ :
-          opStr = " = ";
-          break;
-        default:
-          opStr = expr.getOpType().toString();
-      }
-      BinaryExpression be = (BinaryExpression)expr;
-      return "(" + getHCatComparisonString(be.getLhs()) +
-                  opStr +
-                  getHCatComparisonString(be.getRhs()) + ")";
-    } else {
-      // should be a constant or column
-      return expr.toString();
+        HCatSchema hcatTableSchema = HCatUtil.getTableSchemaWithPtnCols(table);
+        try {
+            PigHCatUtil.validateHCatTableSchemaFollowsPigRules(hcatTableSchema);
+        } catch (IOException e) {
+            throw new PigException(
+                "Table schema incompatible for reading through HCatLoader :" + e.getMessage()
+                    + ";[Table schema was " + hcatTableSchema.toString() + "]"
+                , PigHCatUtil.PIG_EXCEPTION_CODE, e);
+        }
+        storeInUDFContext(signature, HCatConstants.HCAT_TABLE_SCHEMA, hcatTableSchema);
+        outputSchema = hcatTableSchema;
+        return PigHCatUtil.getResourceSchema(hcatTableSchema);
+    }
+
+    @Override
+    public void setPartitionFilter(Expression partitionFilter) throws IOException {
+        // convert the partition filter expression into a string expected by
+        // hcat and pass it in setLocation()
+
+        partitionFilterString = getHCatComparisonString(partitionFilter);
+
+        // store this in the udf context so we can get it later
+        storeInUDFContext(signature,
+            PARTITION_FILTER, partitionFilterString);
+    }
+
+    /**
+     * Get statistics about the data to be loaded. Only input data size is implemented at this time.
+     */
+    @Override
+    public ResourceStatistics getStatistics(String location, Job job) throws IOException {
+        try {
+            ResourceStatistics stats = new ResourceStatistics();
+            InputJobInfo inputJobInfo = (InputJobInfo) HCatUtil.deserialize(
+                job.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO));
+            stats.setmBytes(getSizeInBytes(inputJobInfo) / 1024 / 1024);
+            return stats;
+        } catch (Exception e) {
+            throw new IOException(e);
+        }
+    }
+
+    private String getPartitionFilterString() {
+        if (partitionFilterString == null) {
+            Properties props = UDFContext.getUDFContext().getUDFProperties(
+                this.getClass(), new String[]{signature});
+            partitionFilterString = props.getProperty(PARTITION_FILTER);
+        }
+        return partitionFilterString;
+    }
+
+    private String getHCatComparisonString(Expression expr) {
+        if (expr instanceof BinaryExpression) {
+            // call getHCatComparisonString on lhs and rhs, and and join the
+            // results with OpType string
+
+            // we can just use OpType.toString() on all Expression types except
+            // Equal, NotEqualt since Equal has '==' in toString() and
+            // we need '='
+            String opStr = null;
+            switch (expr.getOpType()) {
+            case OP_EQ:
+                opStr = " = ";
+                break;
+            default:
+                opStr = expr.getOpType().toString();
+            }
+            BinaryExpression be = (BinaryExpression) expr;
+            return "(" + getHCatComparisonString(be.getLhs()) +
+                opStr +
+                getHCatComparisonString(be.getRhs()) + ")";
+        } else {
+            // should be a constant or column
+            return expr.toString();
+        }
     }
-  }
 
 }

Modified: incubator/hcatalog/trunk/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatStorer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatStorer.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatStorer.java (original)
+++ incubator/hcatalog/trunk/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatStorer.java Mon Sep 10 23:28:55 2012
@@ -50,114 +50,114 @@ import org.apache.pig.impl.util.UDFConte
 
 public class HCatStorer extends HCatBaseStorer {
 
-  // Signature for wrapped storer, see comments in LoadFuncBasedInputDriver.initialize
-  final public static String INNER_SIGNATURE = "hcatstorer.inner.signature";
-  final public static String INNER_SIGNATURE_PREFIX = "hcatstorer_inner_signature";
-  // A hash map which stores job credentials. The key is a signature passed by Pig, which is
-  //unique to the store func and out file name (table, in our case).
-  private static Map<String, Credentials> jobCredentials = new HashMap<String, Credentials>();
-
-
-  public HCatStorer(String partSpecs, String schema) throws Exception {
-    super(partSpecs, schema);
-  }
-
-  public HCatStorer(String partSpecs) throws Exception {
-    this(partSpecs, null);
-  }
-
-  public HCatStorer() throws Exception{
-    this(null,null);
-  }
-
-  @Override
-  public OutputFormat getOutputFormat() throws IOException {
-    return new HCatOutputFormat();
-  }
-
-  @Override
-  public void setStoreLocation(String location, Job job) throws IOException {
-    HCatContext.getInstance().mergeConf(job.getConfiguration());
-    HCatContext.getInstance().getConf().setBoolean(
-        HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION, false);
-
-    Configuration config = job.getConfiguration();
-    config.set(INNER_SIGNATURE, INNER_SIGNATURE_PREFIX + "_" + sign);
-    Properties udfProps = UDFContext.getUDFContext().getUDFProperties(
-            this.getClass(), new String[] { sign });
-    String[] userStr = location.split("\\.");
-
-    if (udfProps.containsKey(HCatConstants.HCAT_PIG_STORER_LOCATION_SET)) {
-      for(Enumeration<Object> emr = udfProps.keys();emr.hasMoreElements();){
-        PigHCatUtil.getConfigFromUDFProperties(udfProps, config, emr.nextElement().toString());
-      }
-      Credentials crd = jobCredentials.get(INNER_SIGNATURE_PREFIX + "_" + sign);
-      if (crd != null) {
-        job.getCredentials().addAll(crd);
-      }
-    } else {
-      Job clone = new Job(job.getConfiguration());
-      OutputJobInfo outputJobInfo;
-      if (userStr.length == 2) {
-        outputJobInfo = OutputJobInfo.create(userStr[0], userStr[1], partitions);
-      } else if (userStr.length == 1) {
-        outputJobInfo = OutputJobInfo.create(null, userStr[0], partitions);
-      } else {
-        throw new FrontendException("location " + location
-              + " is invalid. It must be of the form [db.]table",
-              PigHCatUtil.PIG_EXCEPTION_CODE);
-      }
-      Schema schema = (Schema) ObjectSerializer.deserialize(udfProps.getProperty(PIG_SCHEMA));
-      if (schema != null) {
-        pigSchema = schema;
-      }
-      if (pigSchema == null) {
-        throw new FrontendException(
-            "Schema for data cannot be determined.",
-            PigHCatUtil.PIG_EXCEPTION_CODE);
-      }
-      try {
-        HCatOutputFormat.setOutput(job, outputJobInfo);
-      } catch (HCatException he) {
-        // pass the message to the user - essentially something about
-        // the table
-        // information passed to HCatOutputFormat was not right
-        throw new PigException(he.getMessage(),
-            PigHCatUtil.PIG_EXCEPTION_CODE, he);
-      }
-      HCatSchema hcatTblSchema = HCatOutputFormat.getTableSchema(job);
-      try {
-        doSchemaValidations(pigSchema, hcatTblSchema);
-      } catch (HCatException he) {
-        throw new FrontendException(he.getMessage(), PigHCatUtil.PIG_EXCEPTION_CODE, he);
-      }
-      computedSchema = convertPigSchemaToHCatSchema(pigSchema, hcatTblSchema);
-      HCatOutputFormat.setSchema(job, computedSchema);
-      udfProps.setProperty(COMPUTED_OUTPUT_SCHEMA,ObjectSerializer.serialize(computedSchema));
-
-      // We will store all the new /changed properties in the job in the
-      // udf context, so the the HCatOutputFormat.setOutput and setSchema
-      // methods need not be called many times.
-      for ( Entry<String,String> keyValue : job.getConfiguration()) {
-        String oldValue = clone.getConfiguration().getRaw(keyValue.getKey());
-        if ((oldValue == null) || (keyValue.getValue().equals(oldValue) == false)) {
-          udfProps.put(keyValue.getKey(), keyValue.getValue());
+    // Signature for wrapped storer, see comments in LoadFuncBasedInputDriver.initialize
+    final public static String INNER_SIGNATURE = "hcatstorer.inner.signature";
+    final public static String INNER_SIGNATURE_PREFIX = "hcatstorer_inner_signature";
+    // A hash map which stores job credentials. The key is a signature passed by Pig, which is
+    //unique to the store func and out file name (table, in our case).
+    private static Map<String, Credentials> jobCredentials = new HashMap<String, Credentials>();
+
+
+    public HCatStorer(String partSpecs, String schema) throws Exception {
+        super(partSpecs, schema);
+    }
+
+    public HCatStorer(String partSpecs) throws Exception {
+        this(partSpecs, null);
+    }
+
+    public HCatStorer() throws Exception {
+        this(null, null);
+    }
+
+    @Override
+    public OutputFormat getOutputFormat() throws IOException {
+        return new HCatOutputFormat();
+    }
+
+    @Override
+    public void setStoreLocation(String location, Job job) throws IOException {
+        HCatContext.getInstance().mergeConf(job.getConfiguration());
+        HCatContext.getInstance().getConf().setBoolean(
+            HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION, false);
+
+        Configuration config = job.getConfiguration();
+        config.set(INNER_SIGNATURE, INNER_SIGNATURE_PREFIX + "_" + sign);
+        Properties udfProps = UDFContext.getUDFContext().getUDFProperties(
+            this.getClass(), new String[]{sign});
+        String[] userStr = location.split("\\.");
+
+        if (udfProps.containsKey(HCatConstants.HCAT_PIG_STORER_LOCATION_SET)) {
+            for (Enumeration<Object> emr = udfProps.keys(); emr.hasMoreElements(); ) {
+                PigHCatUtil.getConfigFromUDFProperties(udfProps, config, emr.nextElement().toString());
+            }
+            Credentials crd = jobCredentials.get(INNER_SIGNATURE_PREFIX + "_" + sign);
+            if (crd != null) {
+                job.getCredentials().addAll(crd);
+            }
+        } else {
+            Job clone = new Job(job.getConfiguration());
+            OutputJobInfo outputJobInfo;
+            if (userStr.length == 2) {
+                outputJobInfo = OutputJobInfo.create(userStr[0], userStr[1], partitions);
+            } else if (userStr.length == 1) {
+                outputJobInfo = OutputJobInfo.create(null, userStr[0], partitions);
+            } else {
+                throw new FrontendException("location " + location
+                    + " is invalid. It must be of the form [db.]table",
+                    PigHCatUtil.PIG_EXCEPTION_CODE);
+            }
+            Schema schema = (Schema) ObjectSerializer.deserialize(udfProps.getProperty(PIG_SCHEMA));
+            if (schema != null) {
+                pigSchema = schema;
+            }
+            if (pigSchema == null) {
+                throw new FrontendException(
+                    "Schema for data cannot be determined.",
+                    PigHCatUtil.PIG_EXCEPTION_CODE);
+            }
+            try {
+                HCatOutputFormat.setOutput(job, outputJobInfo);
+            } catch (HCatException he) {
+                // pass the message to the user - essentially something about
+                // the table
+                // information passed to HCatOutputFormat was not right
+                throw new PigException(he.getMessage(),
+                    PigHCatUtil.PIG_EXCEPTION_CODE, he);
+            }
+            HCatSchema hcatTblSchema = HCatOutputFormat.getTableSchema(job);
+            try {
+                doSchemaValidations(pigSchema, hcatTblSchema);
+            } catch (HCatException he) {
+                throw new FrontendException(he.getMessage(), PigHCatUtil.PIG_EXCEPTION_CODE, he);
+            }
+            computedSchema = convertPigSchemaToHCatSchema(pigSchema, hcatTblSchema);
+            HCatOutputFormat.setSchema(job, computedSchema);
+            udfProps.setProperty(COMPUTED_OUTPUT_SCHEMA, ObjectSerializer.serialize(computedSchema));
+
+            // We will store all the new /changed properties in the job in the
+            // udf context, so the the HCatOutputFormat.setOutput and setSchema
+            // methods need not be called many times.
+            for (Entry<String, String> keyValue : job.getConfiguration()) {
+                String oldValue = clone.getConfiguration().getRaw(keyValue.getKey());
+                if ((oldValue == null) || (keyValue.getValue().equals(oldValue) == false)) {
+                    udfProps.put(keyValue.getKey(), keyValue.getValue());
+                }
+            }
+            //Store credentials in a private hash map and not the udf context to
+            // make sure they are not public.
+            jobCredentials.put(INNER_SIGNATURE_PREFIX + "_" + sign, job.getCredentials());
+            udfProps.put(HCatConstants.HCAT_PIG_STORER_LOCATION_SET, true);
         }
-      }
-      //Store credentials in a private hash map and not the udf context to
-      // make sure they are not public.
-      jobCredentials.put(INNER_SIGNATURE_PREFIX + "_" + sign,job.getCredentials());
-      udfProps.put(HCatConstants.HCAT_PIG_STORER_LOCATION_SET, true);
-    }
-  }
-
-  @Override
-  public void storeSchema(ResourceSchema schema, String arg1, Job job) throws IOException {
-    HCatHadoopShims.Instance.get().commitJob(getOutputFormat(), schema, arg1, job);
-  }
-
-  @Override
-  public void cleanupOnFailure(String location, Job job) throws IOException {
-      HCatHadoopShims.Instance.get().abortJob(getOutputFormat(), job);
-  }
+    }
+
+    @Override
+    public void storeSchema(ResourceSchema schema, String arg1, Job job) throws IOException {
+        HCatHadoopShims.Instance.get().commitJob(getOutputFormat(), schema, arg1, job);
+    }
+
+    @Override
+    public void cleanupOnFailure(String location, Job job) throws IOException {
+        HCatHadoopShims.Instance.get().abortJob(getOutputFormat(), job);
+    }
 }