You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by tr...@apache.org on 2012/09/10 23:29:03 UTC
svn commit: r1383152 [2/27] - in /incubator/hcatalog/trunk: ./
hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/
hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/
hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/ s...
Modified: incubator/hcatalog/trunk/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatBaseStorer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatBaseStorer.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatBaseStorer.java (original)
+++ incubator/hcatalog/trunk/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatBaseStorer.java Mon Sep 10 23:28:55 2012
@@ -60,382 +60,382 @@ import org.apache.pig.impl.util.Utils;
public abstract class HCatBaseStorer extends StoreFunc implements StoreMetadata {
- private static final List<Type> SUPPORTED_INTEGER_CONVERSIONS =
- Lists.newArrayList(Type.TINYINT, Type.SMALLINT, Type.INT);
- protected static final String COMPUTED_OUTPUT_SCHEMA = "hcat.output.schema";
- protected final List<String> partitionKeys;
- protected final Map<String,String> partitions;
- protected Schema pigSchema;
- private RecordWriter<WritableComparable<?>, HCatRecord> writer;
- protected HCatSchema computedSchema;
- protected static final String PIG_SCHEMA = "hcat.pig.store.schema";
- protected String sign;
-
- public HCatBaseStorer(String partSpecs, String schema) throws Exception {
-
- partitionKeys = new ArrayList<String>();
- partitions = new HashMap<String, String>();
- if(partSpecs != null && !partSpecs.trim().isEmpty()){
- String[] partKVPs = partSpecs.split(",");
- for(String partKVP : partKVPs){
- String[] partKV = partKVP.split("=");
- if(partKV.length == 2) {
- String partKey = partKV[0].trim();
- partitionKeys.add(partKey);
- partitions.put(partKey, partKV[1].trim());
- } else {
- throw new FrontendException("Invalid partition column specification. "+partSpecs, PigHCatUtil.PIG_EXCEPTION_CODE);
+ private static final List<Type> SUPPORTED_INTEGER_CONVERSIONS =
+ Lists.newArrayList(Type.TINYINT, Type.SMALLINT, Type.INT);
+ protected static final String COMPUTED_OUTPUT_SCHEMA = "hcat.output.schema";
+ protected final List<String> partitionKeys;
+ protected final Map<String, String> partitions;
+ protected Schema pigSchema;
+ private RecordWriter<WritableComparable<?>, HCatRecord> writer;
+ protected HCatSchema computedSchema;
+ protected static final String PIG_SCHEMA = "hcat.pig.store.schema";
+ protected String sign;
+
+ public HCatBaseStorer(String partSpecs, String schema) throws Exception {
+
+ partitionKeys = new ArrayList<String>();
+ partitions = new HashMap<String, String>();
+ if (partSpecs != null && !partSpecs.trim().isEmpty()) {
+ String[] partKVPs = partSpecs.split(",");
+ for (String partKVP : partKVPs) {
+ String[] partKV = partKVP.split("=");
+ if (partKV.length == 2) {
+ String partKey = partKV[0].trim();
+ partitionKeys.add(partKey);
+ partitions.put(partKey, partKV[1].trim());
+ } else {
+ throw new FrontendException("Invalid partition column specification. " + partSpecs, PigHCatUtil.PIG_EXCEPTION_CODE);
+ }
+ }
+ }
+
+ if (schema != null) {
+ pigSchema = Utils.getSchemaFromString(schema);
}
- }
- }
- if(schema != null) {
- pigSchema = Utils.getSchemaFromString(schema);
}
- }
-
- @Override
- public void checkSchema(ResourceSchema resourceSchema) throws IOException {
+ @Override
+ public void checkSchema(ResourceSchema resourceSchema) throws IOException {
+
+ /* Schema provided by user and the schema computed by Pig
+ * at the time of calling store must match.
+ */
+ Schema runtimeSchema = Schema.getPigSchema(resourceSchema);
+ if (pigSchema != null) {
+ if (!Schema.equals(runtimeSchema, pigSchema, false, true)) {
+ throw new FrontendException("Schema provided in store statement doesn't match with the Schema" +
+ "returned by Pig run-time. Schema provided in HCatStorer: " + pigSchema.toString() + " Schema received from Pig runtime: " + runtimeSchema.toString(), PigHCatUtil.PIG_EXCEPTION_CODE);
+ }
+ } else {
+ pigSchema = runtimeSchema;
+ }
+ UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[]{sign}).setProperty(PIG_SCHEMA, ObjectSerializer.serialize(pigSchema));
+ }
- /* Schema provided by user and the schema computed by Pig
- * at the time of calling store must match.
+ /** Constructs HCatSchema from pigSchema. Passed tableSchema is the existing
+ * schema of the table in metastore.
*/
- Schema runtimeSchema = Schema.getPigSchema(resourceSchema);
- if(pigSchema != null){
- if(! Schema.equals(runtimeSchema, pigSchema, false, true) ){
- throw new FrontendException("Schema provided in store statement doesn't match with the Schema" +
- "returned by Pig run-time. Schema provided in HCatStorer: "+pigSchema.toString()+ " Schema received from Pig runtime: "+runtimeSchema.toString(), PigHCatUtil.PIG_EXCEPTION_CODE);
- }
- } else {
- pigSchema = runtimeSchema;
- }
- UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[]{sign}).setProperty(PIG_SCHEMA,ObjectSerializer.serialize(pigSchema));
- }
-
- /** Constructs HCatSchema from pigSchema. Passed tableSchema is the existing
- * schema of the table in metastore.
- */
- protected HCatSchema convertPigSchemaToHCatSchema(Schema pigSchema, HCatSchema tableSchema) throws FrontendException{
- List<HCatFieldSchema> fieldSchemas = new ArrayList<HCatFieldSchema>(pigSchema.size());
- for(FieldSchema fSchema : pigSchema.getFields()){
- try {
- HCatFieldSchema hcatFieldSchema = getColFromSchema(fSchema.alias, tableSchema);
-
- fieldSchemas.add(getHCatFSFromPigFS(fSchema, hcatFieldSchema));
- } catch (HCatException he){
- throw new FrontendException(he.getMessage(),PigHCatUtil.PIG_EXCEPTION_CODE,he);
- }
- }
- return new HCatSchema(fieldSchemas);
- }
-
- public static boolean removeTupleFromBag(HCatFieldSchema hcatFieldSchema, FieldSchema bagFieldSchema) throws HCatException{
- if (hcatFieldSchema != null && hcatFieldSchema.getArrayElementSchema().get(0).getType() != Type.STRUCT) {
- return true;
- }
- // Column was not found in table schema. Its a new column
- List<FieldSchema> tupSchema = bagFieldSchema.schema.getFields();
- if (hcatFieldSchema == null && tupSchema.size() == 1 && (tupSchema.get(0).schema == null || (tupSchema.get(0).type == DataType.TUPLE && tupSchema.get(0).schema.size() == 1))) {
- return true;
- }
- return false;
- }
-
-
- private HCatFieldSchema getHCatFSFromPigFS(FieldSchema fSchema, HCatFieldSchema hcatFieldSchema) throws FrontendException, HCatException{
- byte type = fSchema.type;
- switch(type){
-
- case DataType.CHARARRAY:
- case DataType.BIGCHARARRAY:
- return new HCatFieldSchema(fSchema.alias, Type.STRING, null);
-
- case DataType.INTEGER:
- if (hcatFieldSchema != null) {
- if (!SUPPORTED_INTEGER_CONVERSIONS.contains(hcatFieldSchema.getType())) {
- throw new FrontendException("Unsupported type: " + type + " in Pig's schema",
- PigHCatUtil.PIG_EXCEPTION_CODE);
- }
- return new HCatFieldSchema(fSchema.alias, hcatFieldSchema.getType(), null);
- } else {
- return new HCatFieldSchema(fSchema.alias, Type.INT, null);
- }
-
- case DataType.LONG:
- return new HCatFieldSchema(fSchema.alias, Type.BIGINT, null);
-
- case DataType.FLOAT:
- return new HCatFieldSchema(fSchema.alias, Type.FLOAT, null);
-
- case DataType.DOUBLE:
- return new HCatFieldSchema(fSchema.alias, Type.DOUBLE, null);
-
- case DataType.BYTEARRAY:
- return new HCatFieldSchema(fSchema.alias, Type.BINARY, null);
-
- case DataType.BAG:
- Schema bagSchema = fSchema.schema;
- List<HCatFieldSchema> arrFields = new ArrayList<HCatFieldSchema>(1);
- FieldSchema field;
- // Find out if we need to throw away the tuple or not.
- if (removeTupleFromBag(hcatFieldSchema, fSchema)) {
- field = bagSchema.getField(0).schema.getField(0);
- } else {
- field = bagSchema.getField(0);
- }
- arrFields.add(getHCatFSFromPigFS(field, hcatFieldSchema == null ? null : hcatFieldSchema.getArrayElementSchema().get(0)));
- return new HCatFieldSchema(fSchema.alias, Type.ARRAY, new HCatSchema(arrFields), "");
-
- case DataType.TUPLE:
- List<String> fieldNames = new ArrayList<String>();
- List<HCatFieldSchema> hcatFSs = new ArrayList<HCatFieldSchema>();
- HCatSchema structSubSchema = hcatFieldSchema == null ? null : hcatFieldSchema.getStructSubSchema();
- List<FieldSchema> fields = fSchema.schema.getFields();
- for (int i = 0; i < fields.size(); i++) {
- FieldSchema fieldSchema = fields.get(i);
- fieldNames.add(fieldSchema.alias);
- hcatFSs.add(getHCatFSFromPigFS(fieldSchema, structSubSchema == null ? null : structSubSchema.get(i)));
- }
- return new HCatFieldSchema(fSchema.alias, Type.STRUCT, new HCatSchema(hcatFSs), "");
-
- case DataType.MAP:{
- // Pig's schema contain no type information about map's keys and
- // values. So, if its a new column assume <string,string> if its existing
- // return whatever is contained in the existing column.
-
- HCatFieldSchema valFS;
- List<HCatFieldSchema> valFSList = new ArrayList<HCatFieldSchema>(1);
-
- if(hcatFieldSchema != null){
- return new HCatFieldSchema(fSchema.alias, Type.MAP, Type.STRING, hcatFieldSchema.getMapValueSchema(), "");
- }
-
- // Column not found in target table. Its a new column. Its schema is map<string,string>
- valFS = new HCatFieldSchema(fSchema.alias, Type.STRING, "");
- valFSList.add(valFS);
- return new HCatFieldSchema(fSchema.alias,Type.MAP,Type.STRING, new HCatSchema(valFSList),"");
- }
-
- default:
- throw new FrontendException("Unsupported type: "+type+" in Pig's schema", PigHCatUtil.PIG_EXCEPTION_CODE);
- }
- }
-
- @Override
- public void prepareToWrite(RecordWriter writer) throws IOException {
- this.writer = writer;
- computedSchema = (HCatSchema)ObjectSerializer.deserialize(UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[]{sign}).getProperty(COMPUTED_OUTPUT_SCHEMA));
- }
-
- @Override
- public void putNext(Tuple tuple) throws IOException {
-
- List<Object> outgoing = new ArrayList<Object>(tuple.size());
-
- int i = 0;
- for(HCatFieldSchema fSchema : computedSchema.getFields()){
- outgoing.add(getJavaObj(tuple.get(i++), fSchema));
- }
- try {
- writer.write(null, new DefaultHCatRecord(outgoing));
- } catch (InterruptedException e) {
- throw new BackendException("Error while writing tuple: "+tuple, PigHCatUtil.PIG_EXCEPTION_CODE, e);
- }
- }
-
- private Object getJavaObj(Object pigObj, HCatFieldSchema hcatFS) throws HCatException, BackendException{
- try {
-
- // The real work-horse. Spend time and energy in this method if there is
- // need to keep HCatStorer lean and go fast.
- Type type = hcatFS.getType();
- switch(type){
-
- case BINARY:
- if (pigObj == null) {
- return null;
- }
- return ((DataByteArray)pigObj).get();
-
- case STRUCT:
- if (pigObj == null) {
- return null;
- }
- HCatSchema structSubSchema = hcatFS.getStructSubSchema();
- // Unwrap the tuple.
- List<Object> all = ((Tuple)pigObj).getAll();
- ArrayList<Object> converted = new ArrayList<Object>(all.size());
- for (int i = 0; i < all.size(); i++) {
- converted.add(getJavaObj(all.get(i), structSubSchema.get(i)));
- }
- return converted;
-
- case ARRAY:
- if (pigObj == null) {
- return null;
- }
- // Unwrap the bag.
- DataBag pigBag = (DataBag)pigObj;
- HCatFieldSchema tupFS = hcatFS.getArrayElementSchema().get(0);
- boolean needTuple = tupFS.getType() == Type.STRUCT;
- List<Object> bagContents = new ArrayList<Object>((int)pigBag.size());
- Iterator<Tuple> bagItr = pigBag.iterator();
-
- while(bagItr.hasNext()){
- // If there is only one element in tuple contained in bag, we throw away the tuple.
- bagContents.add(getJavaObj(needTuple ? bagItr.next() : bagItr.next().get(0), tupFS));
-
- }
- return bagContents;
- case MAP:
- if (pigObj == null) {
- return null;
- }
- Map<?,?> pigMap = (Map<?,?>)pigObj;
- Map<Object,Object> typeMap = new HashMap<Object, Object>();
- for(Entry<?, ?> entry: pigMap.entrySet()){
- // the value has a schema and not a FieldSchema
- typeMap.put(
- // Schema validation enforces that the Key is a String
- (String)entry.getKey(),
- getJavaObj(entry.getValue(), hcatFS.getMapValueSchema().get(0)));
- }
- return typeMap;
- case STRING:
- case INT:
- case BIGINT:
- case FLOAT:
- case DOUBLE:
- return pigObj;
- case SMALLINT:
- if (pigObj == null) {
- return null;
- }
- if ((Integer) pigObj < Short.MIN_VALUE || (Integer) pigObj > Short.MAX_VALUE) {
- throw new BackendException("Value " + pigObj + " is outside the bounds of column " +
- hcatFS.getName() + " with type " + hcatFS.getType(), PigHCatUtil.PIG_EXCEPTION_CODE);
- }
- return ((Integer) pigObj).shortValue();
- case TINYINT:
- if (pigObj == null) {
- return null;
- }
- if ((Integer) pigObj < Byte.MIN_VALUE || (Integer) pigObj > Byte.MAX_VALUE) {
- throw new BackendException("Value " + pigObj + " is outside the bounds of column " +
- hcatFS.getName() + " with type " + hcatFS.getType(), PigHCatUtil.PIG_EXCEPTION_CODE);
- }
- return ((Integer) pigObj).byteValue();
- case BOOLEAN:
- // would not pass schema validation anyway
- throw new BackendException("Incompatible type "+type+" found in hcat table schema: "+hcatFS, PigHCatUtil.PIG_EXCEPTION_CODE);
- default:
- throw new BackendException("Unexpected type "+type+" for value "+pigObj + (pigObj == null ? "" : " of class " + pigObj.getClass().getName()), PigHCatUtil.PIG_EXCEPTION_CODE);
- }
- } catch (BackendException e) {
- // provide the path to the field in the error message
- throw new BackendException(
- (hcatFS.getName() == null ? " " : hcatFS.getName()+".") + e.getMessage(),
- e.getCause() == null ? e : e.getCause());
- }
- }
-
- @Override
- public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException {
-
- // Need to necessarily override this method since default impl assumes HDFS
- // based location string.
- return location;
- }
-
- @Override
- public void setStoreFuncUDFContextSignature(String signature) {
- sign = signature;
- }
-
-
- protected void doSchemaValidations(Schema pigSchema, HCatSchema tblSchema) throws FrontendException, HCatException{
-
- // Iterate through all the elements in Pig Schema and do validations as
- // dictated by semantics, consult HCatSchema of table when need be.
-
- for(FieldSchema pigField : pigSchema.getFields()){
- HCatFieldSchema hcatField = getColFromSchema(pigField.alias, tblSchema);
- validateSchema(pigField, hcatField);
- }
-
- try {
- PigHCatUtil.validateHCatTableSchemaFollowsPigRules(tblSchema);
- } catch (IOException e) {
- throw new FrontendException("HCatalog schema is not compatible with Pig: "+e.getMessage(), PigHCatUtil.PIG_EXCEPTION_CODE, e);
- }
- }
-
-
- private void validateSchema(FieldSchema pigField, HCatFieldSchema hcatField)
- throws HCatException, FrontendException {
- validateAlias(pigField.alias);
- byte type = pigField.type;
- if(DataType.isComplex(type)){
- switch(type){
-
- case DataType.MAP:
- if(hcatField != null){
- if(hcatField.getMapKeyType() != Type.STRING){
- throw new FrontendException("Key Type of map must be String "+hcatField, PigHCatUtil.PIG_EXCEPTION_CODE);
- }
- // Map values can be primitive or complex
- }
- break;
-
- case DataType.BAG:
- HCatSchema arrayElementSchema = hcatField == null ? null : hcatField.getArrayElementSchema();
- for(FieldSchema innerField : pigField.schema.getField(0).schema.getFields()){
- validateSchema(innerField, getColFromSchema(pigField.alias, arrayElementSchema));
- }
- break;
-
- case DataType.TUPLE:
- HCatSchema structSubSchema = hcatField == null ? null : hcatField.getStructSubSchema();
- for(FieldSchema innerField : pigField.schema.getFields()){
- validateSchema(innerField, getColFromSchema(pigField.alias, structSubSchema));
- }
- break;
-
- default:
- throw new FrontendException("Internal Error.", PigHCatUtil.PIG_EXCEPTION_CODE);
- }
- }
- }
-
- private void validateAlias(String alias) throws FrontendException{
- if(alias == null) {
- throw new FrontendException("Column name for a field is not specified. Please provide the full schema as an argument to HCatStorer.", PigHCatUtil.PIG_EXCEPTION_CODE);
- }
- if(alias.matches(".*[A-Z]+.*")) {
- throw new FrontendException("Column names should all be in lowercase. Invalid name found: "+alias, PigHCatUtil.PIG_EXCEPTION_CODE);
- }
- }
-
- // Finds column by name in HCatSchema, if not found returns null.
- private HCatFieldSchema getColFromSchema(String alias, HCatSchema tblSchema){
- if (tblSchema != null) {
- for(HCatFieldSchema hcatField : tblSchema.getFields()){
- if(hcatField!=null && hcatField.getName()!= null && hcatField.getName().equalsIgnoreCase(alias)){
- return hcatField;
- }
- }
- }
- // Its a new column
- return null;
- }
-
- @Override
- public void cleanupOnFailure(String location, Job job) throws IOException {
- // No-op.
- }
-
- @Override
- public void storeStatistics(ResourceStatistics stats, String arg1, Job job) throws IOException {
- }
+ protected HCatSchema convertPigSchemaToHCatSchema(Schema pigSchema, HCatSchema tableSchema) throws FrontendException {
+ List<HCatFieldSchema> fieldSchemas = new ArrayList<HCatFieldSchema>(pigSchema.size());
+ for (FieldSchema fSchema : pigSchema.getFields()) {
+ try {
+ HCatFieldSchema hcatFieldSchema = getColFromSchema(fSchema.alias, tableSchema);
+
+ fieldSchemas.add(getHCatFSFromPigFS(fSchema, hcatFieldSchema));
+ } catch (HCatException he) {
+ throw new FrontendException(he.getMessage(), PigHCatUtil.PIG_EXCEPTION_CODE, he);
+ }
+ }
+ return new HCatSchema(fieldSchemas);
+ }
+
+ public static boolean removeTupleFromBag(HCatFieldSchema hcatFieldSchema, FieldSchema bagFieldSchema) throws HCatException {
+ if (hcatFieldSchema != null && hcatFieldSchema.getArrayElementSchema().get(0).getType() != Type.STRUCT) {
+ return true;
+ }
+ // Column was not found in table schema. Its a new column
+ List<FieldSchema> tupSchema = bagFieldSchema.schema.getFields();
+ if (hcatFieldSchema == null && tupSchema.size() == 1 && (tupSchema.get(0).schema == null || (tupSchema.get(0).type == DataType.TUPLE && tupSchema.get(0).schema.size() == 1))) {
+ return true;
+ }
+ return false;
+ }
+
+
+ private HCatFieldSchema getHCatFSFromPigFS(FieldSchema fSchema, HCatFieldSchema hcatFieldSchema) throws FrontendException, HCatException {
+ byte type = fSchema.type;
+ switch (type) {
+
+ case DataType.CHARARRAY:
+ case DataType.BIGCHARARRAY:
+ return new HCatFieldSchema(fSchema.alias, Type.STRING, null);
+
+ case DataType.INTEGER:
+ if (hcatFieldSchema != null) {
+ if (!SUPPORTED_INTEGER_CONVERSIONS.contains(hcatFieldSchema.getType())) {
+ throw new FrontendException("Unsupported type: " + type + " in Pig's schema",
+ PigHCatUtil.PIG_EXCEPTION_CODE);
+ }
+ return new HCatFieldSchema(fSchema.alias, hcatFieldSchema.getType(), null);
+ } else {
+ return new HCatFieldSchema(fSchema.alias, Type.INT, null);
+ }
+
+ case DataType.LONG:
+ return new HCatFieldSchema(fSchema.alias, Type.BIGINT, null);
+
+ case DataType.FLOAT:
+ return new HCatFieldSchema(fSchema.alias, Type.FLOAT, null);
+
+ case DataType.DOUBLE:
+ return new HCatFieldSchema(fSchema.alias, Type.DOUBLE, null);
+
+ case DataType.BYTEARRAY:
+ return new HCatFieldSchema(fSchema.alias, Type.BINARY, null);
+
+ case DataType.BAG:
+ Schema bagSchema = fSchema.schema;
+ List<HCatFieldSchema> arrFields = new ArrayList<HCatFieldSchema>(1);
+ FieldSchema field;
+ // Find out if we need to throw away the tuple or not.
+ if (removeTupleFromBag(hcatFieldSchema, fSchema)) {
+ field = bagSchema.getField(0).schema.getField(0);
+ } else {
+ field = bagSchema.getField(0);
+ }
+ arrFields.add(getHCatFSFromPigFS(field, hcatFieldSchema == null ? null : hcatFieldSchema.getArrayElementSchema().get(0)));
+ return new HCatFieldSchema(fSchema.alias, Type.ARRAY, new HCatSchema(arrFields), "");
+
+ case DataType.TUPLE:
+ List<String> fieldNames = new ArrayList<String>();
+ List<HCatFieldSchema> hcatFSs = new ArrayList<HCatFieldSchema>();
+ HCatSchema structSubSchema = hcatFieldSchema == null ? null : hcatFieldSchema.getStructSubSchema();
+ List<FieldSchema> fields = fSchema.schema.getFields();
+ for (int i = 0; i < fields.size(); i++) {
+ FieldSchema fieldSchema = fields.get(i);
+ fieldNames.add(fieldSchema.alias);
+ hcatFSs.add(getHCatFSFromPigFS(fieldSchema, structSubSchema == null ? null : structSubSchema.get(i)));
+ }
+ return new HCatFieldSchema(fSchema.alias, Type.STRUCT, new HCatSchema(hcatFSs), "");
+
+ case DataType.MAP: {
+ // Pig's schema contain no type information about map's keys and
+ // values. So, if its a new column assume <string,string> if its existing
+ // return whatever is contained in the existing column.
+
+ HCatFieldSchema valFS;
+ List<HCatFieldSchema> valFSList = new ArrayList<HCatFieldSchema>(1);
+
+ if (hcatFieldSchema != null) {
+ return new HCatFieldSchema(fSchema.alias, Type.MAP, Type.STRING, hcatFieldSchema.getMapValueSchema(), "");
+ }
+
+ // Column not found in target table. Its a new column. Its schema is map<string,string>
+ valFS = new HCatFieldSchema(fSchema.alias, Type.STRING, "");
+ valFSList.add(valFS);
+ return new HCatFieldSchema(fSchema.alias, Type.MAP, Type.STRING, new HCatSchema(valFSList), "");
+ }
+
+ default:
+ throw new FrontendException("Unsupported type: " + type + " in Pig's schema", PigHCatUtil.PIG_EXCEPTION_CODE);
+ }
+ }
+
+ @Override
+ public void prepareToWrite(RecordWriter writer) throws IOException {
+ this.writer = writer;
+ computedSchema = (HCatSchema) ObjectSerializer.deserialize(UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[]{sign}).getProperty(COMPUTED_OUTPUT_SCHEMA));
+ }
+
+ @Override
+ public void putNext(Tuple tuple) throws IOException {
+
+ List<Object> outgoing = new ArrayList<Object>(tuple.size());
+
+ int i = 0;
+ for (HCatFieldSchema fSchema : computedSchema.getFields()) {
+ outgoing.add(getJavaObj(tuple.get(i++), fSchema));
+ }
+ try {
+ writer.write(null, new DefaultHCatRecord(outgoing));
+ } catch (InterruptedException e) {
+ throw new BackendException("Error while writing tuple: " + tuple, PigHCatUtil.PIG_EXCEPTION_CODE, e);
+ }
+ }
+
+ private Object getJavaObj(Object pigObj, HCatFieldSchema hcatFS) throws HCatException, BackendException {
+ try {
+
+ // The real work-horse. Spend time and energy in this method if there is
+ // need to keep HCatStorer lean and go fast.
+ Type type = hcatFS.getType();
+ switch (type) {
+
+ case BINARY:
+ if (pigObj == null) {
+ return null;
+ }
+ return ((DataByteArray) pigObj).get();
+
+ case STRUCT:
+ if (pigObj == null) {
+ return null;
+ }
+ HCatSchema structSubSchema = hcatFS.getStructSubSchema();
+ // Unwrap the tuple.
+ List<Object> all = ((Tuple) pigObj).getAll();
+ ArrayList<Object> converted = new ArrayList<Object>(all.size());
+ for (int i = 0; i < all.size(); i++) {
+ converted.add(getJavaObj(all.get(i), structSubSchema.get(i)));
+ }
+ return converted;
+
+ case ARRAY:
+ if (pigObj == null) {
+ return null;
+ }
+ // Unwrap the bag.
+ DataBag pigBag = (DataBag) pigObj;
+ HCatFieldSchema tupFS = hcatFS.getArrayElementSchema().get(0);
+ boolean needTuple = tupFS.getType() == Type.STRUCT;
+ List<Object> bagContents = new ArrayList<Object>((int) pigBag.size());
+ Iterator<Tuple> bagItr = pigBag.iterator();
+
+ while (bagItr.hasNext()) {
+ // If there is only one element in tuple contained in bag, we throw away the tuple.
+ bagContents.add(getJavaObj(needTuple ? bagItr.next() : bagItr.next().get(0), tupFS));
+
+ }
+ return bagContents;
+ case MAP:
+ if (pigObj == null) {
+ return null;
+ }
+ Map<?, ?> pigMap = (Map<?, ?>) pigObj;
+ Map<Object, Object> typeMap = new HashMap<Object, Object>();
+ for (Entry<?, ?> entry : pigMap.entrySet()) {
+ // the value has a schema and not a FieldSchema
+ typeMap.put(
+ // Schema validation enforces that the Key is a String
+ (String) entry.getKey(),
+ getJavaObj(entry.getValue(), hcatFS.getMapValueSchema().get(0)));
+ }
+ return typeMap;
+ case STRING:
+ case INT:
+ case BIGINT:
+ case FLOAT:
+ case DOUBLE:
+ return pigObj;
+ case SMALLINT:
+ if (pigObj == null) {
+ return null;
+ }
+ if ((Integer) pigObj < Short.MIN_VALUE || (Integer) pigObj > Short.MAX_VALUE) {
+ throw new BackendException("Value " + pigObj + " is outside the bounds of column " +
+ hcatFS.getName() + " with type " + hcatFS.getType(), PigHCatUtil.PIG_EXCEPTION_CODE);
+ }
+ return ((Integer) pigObj).shortValue();
+ case TINYINT:
+ if (pigObj == null) {
+ return null;
+ }
+ if ((Integer) pigObj < Byte.MIN_VALUE || (Integer) pigObj > Byte.MAX_VALUE) {
+ throw new BackendException("Value " + pigObj + " is outside the bounds of column " +
+ hcatFS.getName() + " with type " + hcatFS.getType(), PigHCatUtil.PIG_EXCEPTION_CODE);
+ }
+ return ((Integer) pigObj).byteValue();
+ case BOOLEAN:
+ // would not pass schema validation anyway
+ throw new BackendException("Incompatible type " + type + " found in hcat table schema: " + hcatFS, PigHCatUtil.PIG_EXCEPTION_CODE);
+ default:
+ throw new BackendException("Unexpected type " + type + " for value " + pigObj + (pigObj == null ? "" : " of class " + pigObj.getClass().getName()), PigHCatUtil.PIG_EXCEPTION_CODE);
+ }
+ } catch (BackendException e) {
+ // provide the path to the field in the error message
+ throw new BackendException(
+ (hcatFS.getName() == null ? " " : hcatFS.getName() + ".") + e.getMessage(),
+ e.getCause() == null ? e : e.getCause());
+ }
+ }
+
+ @Override
+ public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException {
+
+ // Need to necessarily override this method since default impl assumes HDFS
+ // based location string.
+ return location;
+ }
+
+ @Override
+ public void setStoreFuncUDFContextSignature(String signature) {
+ sign = signature;
+ }
+
+
+ protected void doSchemaValidations(Schema pigSchema, HCatSchema tblSchema) throws FrontendException, HCatException {
+
+ // Iterate through all the elements in Pig Schema and do validations as
+ // dictated by semantics, consult HCatSchema of table when need be.
+
+ for (FieldSchema pigField : pigSchema.getFields()) {
+ HCatFieldSchema hcatField = getColFromSchema(pigField.alias, tblSchema);
+ validateSchema(pigField, hcatField);
+ }
+
+ try {
+ PigHCatUtil.validateHCatTableSchemaFollowsPigRules(tblSchema);
+ } catch (IOException e) {
+ throw new FrontendException("HCatalog schema is not compatible with Pig: " + e.getMessage(), PigHCatUtil.PIG_EXCEPTION_CODE, e);
+ }
+ }
+
+
+ private void validateSchema(FieldSchema pigField, HCatFieldSchema hcatField)
+ throws HCatException, FrontendException {
+ validateAlias(pigField.alias);
+ byte type = pigField.type;
+ if (DataType.isComplex(type)) {
+ switch (type) {
+
+ case DataType.MAP:
+ if (hcatField != null) {
+ if (hcatField.getMapKeyType() != Type.STRING) {
+ throw new FrontendException("Key Type of map must be String " + hcatField, PigHCatUtil.PIG_EXCEPTION_CODE);
+ }
+ // Map values can be primitive or complex
+ }
+ break;
+
+ case DataType.BAG:
+ HCatSchema arrayElementSchema = hcatField == null ? null : hcatField.getArrayElementSchema();
+ for (FieldSchema innerField : pigField.schema.getField(0).schema.getFields()) {
+ validateSchema(innerField, getColFromSchema(pigField.alias, arrayElementSchema));
+ }
+ break;
+
+ case DataType.TUPLE:
+ HCatSchema structSubSchema = hcatField == null ? null : hcatField.getStructSubSchema();
+ for (FieldSchema innerField : pigField.schema.getFields()) {
+ validateSchema(innerField, getColFromSchema(pigField.alias, structSubSchema));
+ }
+ break;
+
+ default:
+ throw new FrontendException("Internal Error.", PigHCatUtil.PIG_EXCEPTION_CODE);
+ }
+ }
+ }
+
+ private void validateAlias(String alias) throws FrontendException {
+ if (alias == null) {
+ throw new FrontendException("Column name for a field is not specified. Please provide the full schema as an argument to HCatStorer.", PigHCatUtil.PIG_EXCEPTION_CODE);
+ }
+ if (alias.matches(".*[A-Z]+.*")) {
+ throw new FrontendException("Column names should all be in lowercase. Invalid name found: " + alias, PigHCatUtil.PIG_EXCEPTION_CODE);
+ }
+ }
+
+ // Finds column by name in HCatSchema, if not found returns null.
+ private HCatFieldSchema getColFromSchema(String alias, HCatSchema tblSchema) {
+ if (tblSchema != null) {
+ for (HCatFieldSchema hcatField : tblSchema.getFields()) {
+ if (hcatField != null && hcatField.getName() != null && hcatField.getName().equalsIgnoreCase(alias)) {
+ return hcatField;
+ }
+ }
+ }
+ // Its a new column
+ return null;
+ }
+
+ @Override
+ public void cleanupOnFailure(String location, Job job) throws IOException {
+ // No-op.
+ }
+
+ @Override
+ public void storeStatistics(ResourceStatistics stats, String arg1, Job job) throws IOException {
+ }
}
Modified: incubator/hcatalog/trunk/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatLoader.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatLoader.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatLoader.java (original)
+++ incubator/hcatalog/trunk/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatLoader.java Mon Sep 10 23:28:55 2012
@@ -51,59 +51,59 @@ import org.apache.pig.impl.util.UDFConte
public class HCatLoader extends HCatBaseLoader {
- private static final String PARTITION_FILTER = "partition.filter"; // for future use
+ private static final String PARTITION_FILTER = "partition.filter"; // for future use
- private HCatInputFormat hcatInputFormat = null;
- private String dbName;
- private String tableName;
- private String hcatServerUri;
- private String partitionFilterString;
- private final PigHCatUtil phutil = new PigHCatUtil();
-
- // Signature for wrapped loader, see comments in LoadFuncBasedInputDriver.initialize
- final public static String INNER_SIGNATURE = "hcatloader.inner.signature";
- final public static String INNER_SIGNATURE_PREFIX = "hcatloader_inner_signature";
- // A hash map which stores job credentials. The key is a signature passed by Pig, which is
- //unique to the load func and input file name (table, in our case).
- private static Map<String, Credentials> jobCredentials = new HashMap<String, Credentials>();
-
- @Override
- public InputFormat<?,?> getInputFormat() throws IOException {
- if(hcatInputFormat == null) {
- hcatInputFormat = new HCatInputFormat();
- }
- return hcatInputFormat;
- }
-
- @Override
- public String relativeToAbsolutePath(String location, Path curDir) throws IOException {
- return location;
- }
-
-@Override
- public void setLocation(String location, Job job) throws IOException {
-
- UDFContext udfContext = UDFContext.getUDFContext();
- Properties udfProps = udfContext.getUDFProperties(this.getClass(),
- new String[]{signature});
- job.getConfiguration().set(INNER_SIGNATURE, INNER_SIGNATURE_PREFIX + "_" + signature);
- Pair<String, String> dbTablePair = PigHCatUtil.getDBTableNames(location);
- dbName = dbTablePair.first;
- tableName = dbTablePair.second;
-
- RequiredFieldList requiredFieldsInfo = (RequiredFieldList) udfProps
- .get(PRUNE_PROJECTION_INFO);
- // get partitionFilterString stored in the UDFContext - it would have
- // been stored there by an earlier call to setPartitionFilter
- // call setInput on HCatInputFormat only in the frontend because internally
- // it makes calls to the hcat server - we don't want these to happen in
- // the backend
- // in the hadoop front end mapred.task.id property will not be set in
- // the Configuration
+ private HCatInputFormat hcatInputFormat = null;
+ private String dbName;
+ private String tableName;
+ private String hcatServerUri;
+ private String partitionFilterString;
+ private final PigHCatUtil phutil = new PigHCatUtil();
+
+ // Signature for wrapped loader, see comments in LoadFuncBasedInputDriver.initialize
+ final public static String INNER_SIGNATURE = "hcatloader.inner.signature";
+ final public static String INNER_SIGNATURE_PREFIX = "hcatloader_inner_signature";
+ // A hash map which stores job credentials. The key is a signature passed by Pig, which is
+ //unique to the load func and input file name (table, in our case).
+ private static Map<String, Credentials> jobCredentials = new HashMap<String, Credentials>();
+
+ @Override
+ public InputFormat<?, ?> getInputFormat() throws IOException {
+ if (hcatInputFormat == null) {
+ hcatInputFormat = new HCatInputFormat();
+ }
+ return hcatInputFormat;
+ }
+
+ @Override
+ public String relativeToAbsolutePath(String location, Path curDir) throws IOException {
+ return location;
+ }
+
+ @Override
+ public void setLocation(String location, Job job) throws IOException {
+
+ UDFContext udfContext = UDFContext.getUDFContext();
+ Properties udfProps = udfContext.getUDFProperties(this.getClass(),
+ new String[]{signature});
+ job.getConfiguration().set(INNER_SIGNATURE, INNER_SIGNATURE_PREFIX + "_" + signature);
+ Pair<String, String> dbTablePair = PigHCatUtil.getDBTableNames(location);
+ dbName = dbTablePair.first;
+ tableName = dbTablePair.second;
+
+ RequiredFieldList requiredFieldsInfo = (RequiredFieldList) udfProps
+ .get(PRUNE_PROJECTION_INFO);
+ // get partitionFilterString stored in the UDFContext - it would have
+ // been stored there by an earlier call to setPartitionFilter
+ // call setInput on HCatInputFormat only in the frontend because internally
+ // it makes calls to the hcat server - we don't want these to happen in
+ // the backend
+ // in the hadoop front end mapred.task.id property will not be set in
+ // the Configuration
if (udfProps.containsKey(HCatConstants.HCAT_PIG_LOADER_LOCATION_SET)) {
- for( Enumeration<Object> emr = udfProps.keys();emr.hasMoreElements();) {
+ for (Enumeration<Object> emr = udfProps.keys(); emr.hasMoreElements(); ) {
PigHCatUtil.getConfigFromUDFProperties(udfProps,
- job.getConfiguration(), emr.nextElement().toString());
+ job.getConfiguration(), emr.nextElement().toString());
}
if (!HCatUtil.checkJobContextIfRunningFromBackend(job)) {
//Combine credentials and credentials from job takes precedence for freshness
@@ -114,12 +114,12 @@ public class HCatLoader extends HCatBase
} else {
Job clone = new Job(job.getConfiguration());
HCatInputFormat.setInput(job, InputJobInfo.create(dbName,
- tableName, getPartitionFilterString()));
+ tableName, getPartitionFilterString()));
// We will store all the new /changed properties in the job in the
// udf context, so the the HCatInputFormat.setInput method need not
//be called many times.
- for (Entry<String,String> keyValue : job.getConfiguration()) {
+ for (Entry<String, String> keyValue : job.getConfiguration()) {
String oldValue = clone.getConfiguration().getRaw(keyValue.getKey());
if ((oldValue == null) || (keyValue.getValue().equals(oldValue) == false)) {
udfProps.put(keyValue.getKey(), keyValue.getValue());
@@ -144,129 +144,129 @@ public class HCatLoader extends HCatBase
// here will ensure we communicate to HCatInputFormat about pruned
// projections at getSplits() and createRecordReader() time
- if(requiredFieldsInfo != null) {
- // convert to hcatschema and pass to HCatInputFormat
- try {
- outputSchema = phutil.getHCatSchema(requiredFieldsInfo.getFields(),signature,this.getClass());
- HCatInputFormat.setOutputSchema(job, outputSchema);
- } catch (Exception e) {
- throw new IOException(e);
- }
- } else{
- // else - this means pig's optimizer never invoked the pushProjection
- // method - so we need all fields and hence we should not call the
- // setOutputSchema on HCatInputFormat
- if (HCatUtil.checkJobContextIfRunningFromBackend(job)){
+ if (requiredFieldsInfo != null) {
+ // convert to hcatschema and pass to HCatInputFormat
try {
- HCatSchema hcatTableSchema = (HCatSchema) udfProps.get(HCatConstants.HCAT_TABLE_SCHEMA);
- outputSchema = hcatTableSchema;
- HCatInputFormat.setOutputSchema(job, outputSchema);
+ outputSchema = phutil.getHCatSchema(requiredFieldsInfo.getFields(), signature, this.getClass());
+ HCatInputFormat.setOutputSchema(job, outputSchema);
} catch (Exception e) {
- throw new IOException(e);
+ throw new IOException(e);
+ }
+ } else {
+ // else - this means pig's optimizer never invoked the pushProjection
+ // method - so we need all fields and hence we should not call the
+ // setOutputSchema on HCatInputFormat
+ if (HCatUtil.checkJobContextIfRunningFromBackend(job)) {
+ try {
+ HCatSchema hcatTableSchema = (HCatSchema) udfProps.get(HCatConstants.HCAT_TABLE_SCHEMA);
+ outputSchema = hcatTableSchema;
+ HCatInputFormat.setOutputSchema(job, outputSchema);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
}
- }
}
- }
+ }
- @Override
- public String[] getPartitionKeys(String location, Job job)
- throws IOException {
- Table table = phutil.getTable(location,
- hcatServerUri!=null?hcatServerUri:PigHCatUtil.getHCatServerUri(job),
+ @Override
+ public String[] getPartitionKeys(String location, Job job)
+ throws IOException {
+ Table table = phutil.getTable(location,
+ hcatServerUri != null ? hcatServerUri : PigHCatUtil.getHCatServerUri(job),
PigHCatUtil.getHCatServerPrincipal(job));
- List<FieldSchema> tablePartitionKeys = table.getPartitionKeys();
- String[] partitionKeys = new String[tablePartitionKeys.size()];
- for(int i = 0; i < tablePartitionKeys.size(); i++) {
- partitionKeys[i] = tablePartitionKeys.get(i).getName();
- }
- return partitionKeys;
- }
-
- @Override
- public ResourceSchema getSchema(String location, Job job) throws IOException {
- HCatContext.getInstance().mergeConf(job.getConfiguration());
- HCatContext.getInstance().getConf().setBoolean(
- HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION, true);
+ List<FieldSchema> tablePartitionKeys = table.getPartitionKeys();
+ String[] partitionKeys = new String[tablePartitionKeys.size()];
+ for (int i = 0; i < tablePartitionKeys.size(); i++) {
+ partitionKeys[i] = tablePartitionKeys.get(i).getName();
+ }
+ return partitionKeys;
+ }
+
+ @Override
+ public ResourceSchema getSchema(String location, Job job) throws IOException {
+ HCatContext.getInstance().mergeConf(job.getConfiguration());
+ HCatContext.getInstance().getConf().setBoolean(
+ HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION, true);
- Table table = phutil.getTable(location,
- hcatServerUri!=null?hcatServerUri:PigHCatUtil.getHCatServerUri(job),
+ Table table = phutil.getTable(location,
+ hcatServerUri != null ? hcatServerUri : PigHCatUtil.getHCatServerUri(job),
PigHCatUtil.getHCatServerPrincipal(job));
- HCatSchema hcatTableSchema = HCatUtil.getTableSchemaWithPtnCols(table);
- try {
- PigHCatUtil.validateHCatTableSchemaFollowsPigRules(hcatTableSchema);
- } catch (IOException e){
- throw new PigException(
- "Table schema incompatible for reading through HCatLoader :" + e.getMessage()
- + ";[Table schema was "+ hcatTableSchema.toString() +"]"
- ,PigHCatUtil.PIG_EXCEPTION_CODE, e);
- }
- storeInUDFContext(signature, HCatConstants.HCAT_TABLE_SCHEMA, hcatTableSchema);
- outputSchema = hcatTableSchema;
- return PigHCatUtil.getResourceSchema(hcatTableSchema);
- }
-
- @Override
- public void setPartitionFilter(Expression partitionFilter) throws IOException {
- // convert the partition filter expression into a string expected by
- // hcat and pass it in setLocation()
-
- partitionFilterString = getHCatComparisonString(partitionFilter);
-
- // store this in the udf context so we can get it later
- storeInUDFContext(signature,
- PARTITION_FILTER, partitionFilterString);
- }
-
- /**
- * Get statistics about the data to be loaded. Only input data size is implemented at this time.
- */
- @Override
- public ResourceStatistics getStatistics(String location, Job job) throws IOException {
- try {
- ResourceStatistics stats = new ResourceStatistics();
- InputJobInfo inputJobInfo = (InputJobInfo) HCatUtil.deserialize(
- job.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO));
- stats.setmBytes(getSizeInBytes(inputJobInfo) / 1024 / 1024);
- return stats;
- } catch (Exception e) {
- throw new IOException(e);
- }
- }
-
- private String getPartitionFilterString() {
- if(partitionFilterString == null) {
- Properties props = UDFContext.getUDFContext().getUDFProperties(
- this.getClass(), new String[] {signature});
- partitionFilterString = props.getProperty(PARTITION_FILTER);
- }
- return partitionFilterString;
- }
-
- private String getHCatComparisonString(Expression expr) {
- if(expr instanceof BinaryExpression){
- // call getHCatComparisonString on lhs and rhs, and and join the
- // results with OpType string
-
- // we can just use OpType.toString() on all Expression types except
- // Equal, NotEqualt since Equal has '==' in toString() and
- // we need '='
- String opStr = null;
- switch(expr.getOpType()){
- case OP_EQ :
- opStr = " = ";
- break;
- default:
- opStr = expr.getOpType().toString();
- }
- BinaryExpression be = (BinaryExpression)expr;
- return "(" + getHCatComparisonString(be.getLhs()) +
- opStr +
- getHCatComparisonString(be.getRhs()) + ")";
- } else {
- // should be a constant or column
- return expr.toString();
+ HCatSchema hcatTableSchema = HCatUtil.getTableSchemaWithPtnCols(table);
+ try {
+ PigHCatUtil.validateHCatTableSchemaFollowsPigRules(hcatTableSchema);
+ } catch (IOException e) {
+ throw new PigException(
+ "Table schema incompatible for reading through HCatLoader :" + e.getMessage()
+ + ";[Table schema was " + hcatTableSchema.toString() + "]"
+ , PigHCatUtil.PIG_EXCEPTION_CODE, e);
+ }
+ storeInUDFContext(signature, HCatConstants.HCAT_TABLE_SCHEMA, hcatTableSchema);
+ outputSchema = hcatTableSchema;
+ return PigHCatUtil.getResourceSchema(hcatTableSchema);
+ }
+
+ @Override
+ public void setPartitionFilter(Expression partitionFilter) throws IOException {
+ // convert the partition filter expression into a string expected by
+ // hcat and pass it in setLocation()
+
+ partitionFilterString = getHCatComparisonString(partitionFilter);
+
+ // store this in the udf context so we can get it later
+ storeInUDFContext(signature,
+ PARTITION_FILTER, partitionFilterString);
+ }
+
+ /**
+ * Get statistics about the data to be loaded. Only input data size is implemented at this time.
+ */
+ @Override
+ public ResourceStatistics getStatistics(String location, Job job) throws IOException {
+ try {
+ ResourceStatistics stats = new ResourceStatistics();
+ InputJobInfo inputJobInfo = (InputJobInfo) HCatUtil.deserialize(
+ job.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO));
+ stats.setmBytes(getSizeInBytes(inputJobInfo) / 1024 / 1024);
+ return stats;
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ private String getPartitionFilterString() {
+ if (partitionFilterString == null) {
+ Properties props = UDFContext.getUDFContext().getUDFProperties(
+ this.getClass(), new String[]{signature});
+ partitionFilterString = props.getProperty(PARTITION_FILTER);
+ }
+ return partitionFilterString;
+ }
+
+ private String getHCatComparisonString(Expression expr) {
+ if (expr instanceof BinaryExpression) {
+ // call getHCatComparisonString on lhs and rhs, and and join the
+ // results with OpType string
+
+ // we can just use OpType.toString() on all Expression types except
+ // Equal, NotEqualt since Equal has '==' in toString() and
+ // we need '='
+ String opStr = null;
+ switch (expr.getOpType()) {
+ case OP_EQ:
+ opStr = " = ";
+ break;
+ default:
+ opStr = expr.getOpType().toString();
+ }
+ BinaryExpression be = (BinaryExpression) expr;
+ return "(" + getHCatComparisonString(be.getLhs()) +
+ opStr +
+ getHCatComparisonString(be.getRhs()) + ")";
+ } else {
+ // should be a constant or column
+ return expr.toString();
+ }
}
- }
}
Modified: incubator/hcatalog/trunk/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatStorer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatStorer.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatStorer.java (original)
+++ incubator/hcatalog/trunk/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatStorer.java Mon Sep 10 23:28:55 2012
@@ -50,114 +50,114 @@ import org.apache.pig.impl.util.UDFConte
public class HCatStorer extends HCatBaseStorer {
- // Signature for wrapped storer, see comments in LoadFuncBasedInputDriver.initialize
- final public static String INNER_SIGNATURE = "hcatstorer.inner.signature";
- final public static String INNER_SIGNATURE_PREFIX = "hcatstorer_inner_signature";
- // A hash map which stores job credentials. The key is a signature passed by Pig, which is
- //unique to the store func and out file name (table, in our case).
- private static Map<String, Credentials> jobCredentials = new HashMap<String, Credentials>();
-
-
- public HCatStorer(String partSpecs, String schema) throws Exception {
- super(partSpecs, schema);
- }
-
- public HCatStorer(String partSpecs) throws Exception {
- this(partSpecs, null);
- }
-
- public HCatStorer() throws Exception{
- this(null,null);
- }
-
- @Override
- public OutputFormat getOutputFormat() throws IOException {
- return new HCatOutputFormat();
- }
-
- @Override
- public void setStoreLocation(String location, Job job) throws IOException {
- HCatContext.getInstance().mergeConf(job.getConfiguration());
- HCatContext.getInstance().getConf().setBoolean(
- HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION, false);
-
- Configuration config = job.getConfiguration();
- config.set(INNER_SIGNATURE, INNER_SIGNATURE_PREFIX + "_" + sign);
- Properties udfProps = UDFContext.getUDFContext().getUDFProperties(
- this.getClass(), new String[] { sign });
- String[] userStr = location.split("\\.");
-
- if (udfProps.containsKey(HCatConstants.HCAT_PIG_STORER_LOCATION_SET)) {
- for(Enumeration<Object> emr = udfProps.keys();emr.hasMoreElements();){
- PigHCatUtil.getConfigFromUDFProperties(udfProps, config, emr.nextElement().toString());
- }
- Credentials crd = jobCredentials.get(INNER_SIGNATURE_PREFIX + "_" + sign);
- if (crd != null) {
- job.getCredentials().addAll(crd);
- }
- } else {
- Job clone = new Job(job.getConfiguration());
- OutputJobInfo outputJobInfo;
- if (userStr.length == 2) {
- outputJobInfo = OutputJobInfo.create(userStr[0], userStr[1], partitions);
- } else if (userStr.length == 1) {
- outputJobInfo = OutputJobInfo.create(null, userStr[0], partitions);
- } else {
- throw new FrontendException("location " + location
- + " is invalid. It must be of the form [db.]table",
- PigHCatUtil.PIG_EXCEPTION_CODE);
- }
- Schema schema = (Schema) ObjectSerializer.deserialize(udfProps.getProperty(PIG_SCHEMA));
- if (schema != null) {
- pigSchema = schema;
- }
- if (pigSchema == null) {
- throw new FrontendException(
- "Schema for data cannot be determined.",
- PigHCatUtil.PIG_EXCEPTION_CODE);
- }
- try {
- HCatOutputFormat.setOutput(job, outputJobInfo);
- } catch (HCatException he) {
- // pass the message to the user - essentially something about
- // the table
- // information passed to HCatOutputFormat was not right
- throw new PigException(he.getMessage(),
- PigHCatUtil.PIG_EXCEPTION_CODE, he);
- }
- HCatSchema hcatTblSchema = HCatOutputFormat.getTableSchema(job);
- try {
- doSchemaValidations(pigSchema, hcatTblSchema);
- } catch (HCatException he) {
- throw new FrontendException(he.getMessage(), PigHCatUtil.PIG_EXCEPTION_CODE, he);
- }
- computedSchema = convertPigSchemaToHCatSchema(pigSchema, hcatTblSchema);
- HCatOutputFormat.setSchema(job, computedSchema);
- udfProps.setProperty(COMPUTED_OUTPUT_SCHEMA,ObjectSerializer.serialize(computedSchema));
-
- // We will store all the new /changed properties in the job in the
- // udf context, so the the HCatOutputFormat.setOutput and setSchema
- // methods need not be called many times.
- for ( Entry<String,String> keyValue : job.getConfiguration()) {
- String oldValue = clone.getConfiguration().getRaw(keyValue.getKey());
- if ((oldValue == null) || (keyValue.getValue().equals(oldValue) == false)) {
- udfProps.put(keyValue.getKey(), keyValue.getValue());
+ // Signature for wrapped storer, see comments in LoadFuncBasedInputDriver.initialize
+ final public static String INNER_SIGNATURE = "hcatstorer.inner.signature";
+ final public static String INNER_SIGNATURE_PREFIX = "hcatstorer_inner_signature";
+ // A hash map which stores job credentials. The key is a signature passed by Pig, which is
+ //unique to the store func and out file name (table, in our case).
+ private static Map<String, Credentials> jobCredentials = new HashMap<String, Credentials>();
+
+
+ public HCatStorer(String partSpecs, String schema) throws Exception {
+ super(partSpecs, schema);
+ }
+
+ public HCatStorer(String partSpecs) throws Exception {
+ this(partSpecs, null);
+ }
+
+ public HCatStorer() throws Exception {
+ this(null, null);
+ }
+
+ @Override
+ public OutputFormat getOutputFormat() throws IOException {
+ return new HCatOutputFormat();
+ }
+
+ @Override
+ public void setStoreLocation(String location, Job job) throws IOException {
+ HCatContext.getInstance().mergeConf(job.getConfiguration());
+ HCatContext.getInstance().getConf().setBoolean(
+ HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION, false);
+
+ Configuration config = job.getConfiguration();
+ config.set(INNER_SIGNATURE, INNER_SIGNATURE_PREFIX + "_" + sign);
+ Properties udfProps = UDFContext.getUDFContext().getUDFProperties(
+ this.getClass(), new String[]{sign});
+ String[] userStr = location.split("\\.");
+
+ if (udfProps.containsKey(HCatConstants.HCAT_PIG_STORER_LOCATION_SET)) {
+ for (Enumeration<Object> emr = udfProps.keys(); emr.hasMoreElements(); ) {
+ PigHCatUtil.getConfigFromUDFProperties(udfProps, config, emr.nextElement().toString());
+ }
+ Credentials crd = jobCredentials.get(INNER_SIGNATURE_PREFIX + "_" + sign);
+ if (crd != null) {
+ job.getCredentials().addAll(crd);
+ }
+ } else {
+ Job clone = new Job(job.getConfiguration());
+ OutputJobInfo outputJobInfo;
+ if (userStr.length == 2) {
+ outputJobInfo = OutputJobInfo.create(userStr[0], userStr[1], partitions);
+ } else if (userStr.length == 1) {
+ outputJobInfo = OutputJobInfo.create(null, userStr[0], partitions);
+ } else {
+ throw new FrontendException("location " + location
+ + " is invalid. It must be of the form [db.]table",
+ PigHCatUtil.PIG_EXCEPTION_CODE);
+ }
+ Schema schema = (Schema) ObjectSerializer.deserialize(udfProps.getProperty(PIG_SCHEMA));
+ if (schema != null) {
+ pigSchema = schema;
+ }
+ if (pigSchema == null) {
+ throw new FrontendException(
+ "Schema for data cannot be determined.",
+ PigHCatUtil.PIG_EXCEPTION_CODE);
+ }
+ try {
+ HCatOutputFormat.setOutput(job, outputJobInfo);
+ } catch (HCatException he) {
+ // pass the message to the user - essentially something about
+ // the table
+ // information passed to HCatOutputFormat was not right
+ throw new PigException(he.getMessage(),
+ PigHCatUtil.PIG_EXCEPTION_CODE, he);
+ }
+ HCatSchema hcatTblSchema = HCatOutputFormat.getTableSchema(job);
+ try {
+ doSchemaValidations(pigSchema, hcatTblSchema);
+ } catch (HCatException he) {
+ throw new FrontendException(he.getMessage(), PigHCatUtil.PIG_EXCEPTION_CODE, he);
+ }
+ computedSchema = convertPigSchemaToHCatSchema(pigSchema, hcatTblSchema);
+ HCatOutputFormat.setSchema(job, computedSchema);
+ udfProps.setProperty(COMPUTED_OUTPUT_SCHEMA, ObjectSerializer.serialize(computedSchema));
+
+ // We will store all the new /changed properties in the job in the
+ // udf context, so the the HCatOutputFormat.setOutput and setSchema
+ // methods need not be called many times.
+ for (Entry<String, String> keyValue : job.getConfiguration()) {
+ String oldValue = clone.getConfiguration().getRaw(keyValue.getKey());
+ if ((oldValue == null) || (keyValue.getValue().equals(oldValue) == false)) {
+ udfProps.put(keyValue.getKey(), keyValue.getValue());
+ }
+ }
+ //Store credentials in a private hash map and not the udf context to
+ // make sure they are not public.
+ jobCredentials.put(INNER_SIGNATURE_PREFIX + "_" + sign, job.getCredentials());
+ udfProps.put(HCatConstants.HCAT_PIG_STORER_LOCATION_SET, true);
}
- }
- //Store credentials in a private hash map and not the udf context to
- // make sure they are not public.
- jobCredentials.put(INNER_SIGNATURE_PREFIX + "_" + sign,job.getCredentials());
- udfProps.put(HCatConstants.HCAT_PIG_STORER_LOCATION_SET, true);
- }
- }
-
- @Override
- public void storeSchema(ResourceSchema schema, String arg1, Job job) throws IOException {
- HCatHadoopShims.Instance.get().commitJob(getOutputFormat(), schema, arg1, job);
- }
-
- @Override
- public void cleanupOnFailure(String location, Job job) throws IOException {
- HCatHadoopShims.Instance.get().abortJob(getOutputFormat(), job);
- }
+ }
+
+ @Override
+ public void storeSchema(ResourceSchema schema, String arg1, Job job) throws IOException {
+ HCatHadoopShims.Instance.get().commitJob(getOutputFormat(), schema, arg1, job);
+ }
+
+ @Override
+ public void cleanupOnFailure(String location, Job job) throws IOException {
+ HCatHadoopShims.Instance.get().abortJob(getOutputFormat(), job);
+ }
}