You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kh...@apache.org on 2013/09/06 02:49:17 UTC
svn commit: r1520466 [13/18] - in /hive/trunk/hcatalog:
core/src/main/java/org/apache/hcatalog/cli/
core/src/main/java/org/apache/hcatalog/cli/SemanticAnalysis/
core/src/main/java/org/apache/hcatalog/common/
core/src/main/java/org/apache/hcatalog/data/...
Added: hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatBaseLoader.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatBaseLoader.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatBaseLoader.java (added)
+++ hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatBaseLoader.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hcatalog.pig;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.mapreduce.InputJobInfo;
+import org.apache.hcatalog.mapreduce.PartInfo;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.LoadMetadata;
+import org.apache.pig.LoadPushDown;
+import org.apache.pig.PigException;
+import org.apache.pig.ResourceStatistics;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.util.UDFContext;
+
+/**
+ * Base class for HCatLoader and HCatEximLoader
+ */
+
+abstract class HCatBaseLoader extends LoadFunc implements LoadMetadata, LoadPushDown {
+
+ protected static final String PRUNE_PROJECTION_INFO = "prune.projection.info";
+
+ private RecordReader<?, ?> reader;
+ protected String signature;
+
+ HCatSchema outputSchema = null;
+
+
+ @Override
+ public Tuple getNext() throws IOException {
+ try {
+ HCatRecord hr = (HCatRecord) (reader.nextKeyValue() ? reader.getCurrentValue() : null);
+ Tuple t = PigHCatUtil.transformToTuple(hr, outputSchema);
+ // TODO : we were discussing an iter interface, and also a LazyTuple
+ // change this when plans for that solidifies.
+ return t;
+ } catch (ExecException e) {
+ int errCode = 6018;
+ String errMsg = "Error while reading input";
+ throw new ExecException(errMsg, errCode,
+ PigException.REMOTE_ENVIRONMENT, e);
+ } catch (Exception eOther) {
+ int errCode = 6018;
+ String errMsg = "Error converting read value to tuple";
+ throw new ExecException(errMsg, errCode,
+ PigException.REMOTE_ENVIRONMENT, eOther);
+ }
+
+ }
+
+ @Override
+ public void prepareToRead(RecordReader reader, PigSplit arg1) throws IOException {
+ this.reader = reader;
+ }
+
+ @Override
+ public ResourceStatistics getStatistics(String location, Job job) throws IOException {
+ // statistics not implemented currently
+ return null;
+ }
+
+ @Override
+ public List<OperatorSet> getFeatures() {
+ return Arrays.asList(LoadPushDown.OperatorSet.PROJECTION);
+ }
+
+ @Override
+ public RequiredFieldResponse pushProjection(RequiredFieldList requiredFieldsInfo) throws FrontendException {
+ // Store the required fields information in the UDFContext so that we
+ // can retrieve it later.
+ storeInUDFContext(signature, PRUNE_PROJECTION_INFO, requiredFieldsInfo);
+
+ // HCat will always prune columns based on what we ask of it - so the
+ // response is true
+ return new RequiredFieldResponse(true);
+ }
+
+ @Override
+ public void setUDFContextSignature(String signature) {
+ this.signature = signature;
+ }
+
+
+ // helper methods
+ protected void storeInUDFContext(String signature, String key, Object value) {
+ UDFContext udfContext = UDFContext.getUDFContext();
+ Properties props = udfContext.getUDFProperties(
+ this.getClass(), new String[]{signature});
+ props.put(key, value);
+ }
+
+ /**
+ * A utility method to get the size of inputs. This is accomplished by summing the
+ * size of all input paths on supported FileSystems. Locations whose size cannot be
+ * determined are ignored. Note non-FileSystem and unpartitioned locations will not
+ * report their input size by default.
+ */
+ protected static long getSizeInBytes(InputJobInfo inputJobInfo) throws IOException {
+ Configuration conf = new Configuration();
+ long sizeInBytes = 0;
+
+ for (PartInfo partInfo : inputJobInfo.getPartitions()) {
+ try {
+ Path p = new Path(partInfo.getLocation());
+ if (p.getFileSystem(conf).isFile(p)) {
+ sizeInBytes += p.getFileSystem(conf).getFileStatus(p).getLen();
+ } else {
+ FileStatus[] fileStatuses = p.getFileSystem(conf).listStatus(p);
+ if (fileStatuses != null) {
+ for (FileStatus child : fileStatuses) {
+ sizeInBytes += child.getLen();
+ }
+ }
+ }
+ } catch (IOException e) {
+ // Report size to the extent possible.
+ }
+ }
+
+ return sizeInBytes;
+ }
+}
Added: hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatBaseStorer.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatBaseStorer.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatBaseStorer.java (added)
+++ hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatBaseStorer.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,442 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.pig;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatFieldSchema.Type;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceStatistics;
+import org.apache.pig.StoreFunc;
+import org.apache.pig.StoreMetadata;
+import org.apache.pig.backend.BackendException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.impl.util.Utils;
+
+/**
+ * Base class for HCatStorer and HCatEximStorer
+ *
+ */
+
+abstract class HCatBaseStorer extends StoreFunc implements StoreMetadata {
+
+ private static final List<Type> SUPPORTED_INTEGER_CONVERSIONS =
+ Lists.newArrayList(Type.TINYINT, Type.SMALLINT, Type.INT);
+ protected static final String COMPUTED_OUTPUT_SCHEMA = "hcat.output.schema";
+ protected final List<String> partitionKeys;
+ protected final Map<String, String> partitions;
+ protected Schema pigSchema;
+ private RecordWriter<WritableComparable<?>, HCatRecord> writer;
+ protected HCatSchema computedSchema;
+ protected static final String PIG_SCHEMA = "hcat.pig.store.schema";
+ protected String sign;
+
+ public HCatBaseStorer(String partSpecs, String schema) throws Exception {
+
+ partitionKeys = new ArrayList<String>();
+ partitions = new HashMap<String, String>();
+ if (partSpecs != null && !partSpecs.trim().isEmpty()) {
+ String[] partKVPs = partSpecs.split(",");
+ for (String partKVP : partKVPs) {
+ String[] partKV = partKVP.split("=");
+ if (partKV.length == 2) {
+ String partKey = partKV[0].trim();
+ partitionKeys.add(partKey);
+ partitions.put(partKey, partKV[1].trim());
+ } else {
+ throw new FrontendException("Invalid partition column specification. " + partSpecs, PigHCatUtil.PIG_EXCEPTION_CODE);
+ }
+ }
+ }
+
+ if (schema != null) {
+ pigSchema = Utils.getSchemaFromString(schema);
+ }
+
+ }
+
+ @Override
+ public void checkSchema(ResourceSchema resourceSchema) throws IOException {
+
+ /* Schema provided by user and the schema computed by Pig
+ * at the time of calling store must match.
+ */
+ Schema runtimeSchema = Schema.getPigSchema(resourceSchema);
+ if (pigSchema != null) {
+ if (!Schema.equals(runtimeSchema, pigSchema, false, true)) {
+ throw new FrontendException("Schema provided in store statement doesn't match with the Schema" +
+ "returned by Pig run-time. Schema provided in HCatStorer: " + pigSchema.toString() + " Schema received from Pig runtime: " + runtimeSchema.toString(), PigHCatUtil.PIG_EXCEPTION_CODE);
+ }
+ } else {
+ pigSchema = runtimeSchema;
+ }
+ UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[]{sign}).setProperty(PIG_SCHEMA, ObjectSerializer.serialize(pigSchema));
+ }
+
+ /** Constructs HCatSchema from pigSchema. Passed tableSchema is the existing
+ * schema of the table in metastore.
+ */
+ protected HCatSchema convertPigSchemaToHCatSchema(Schema pigSchema, HCatSchema tableSchema) throws FrontendException {
+ List<HCatFieldSchema> fieldSchemas = new ArrayList<HCatFieldSchema>(pigSchema.size());
+ for (FieldSchema fSchema : pigSchema.getFields()) {
+ try {
+ HCatFieldSchema hcatFieldSchema = getColFromSchema(fSchema.alias, tableSchema);
+
+ fieldSchemas.add(getHCatFSFromPigFS(fSchema, hcatFieldSchema));
+ } catch (HCatException he) {
+ throw new FrontendException(he.getMessage(), PigHCatUtil.PIG_EXCEPTION_CODE, he);
+ }
+ }
+ return new HCatSchema(fieldSchemas);
+ }
+
+ public static boolean removeTupleFromBag(HCatFieldSchema hcatFieldSchema, FieldSchema bagFieldSchema) throws HCatException {
+ if (hcatFieldSchema != null && hcatFieldSchema.getArrayElementSchema().get(0).getType() != Type.STRUCT) {
+ return true;
+ }
+ // Column was not found in table schema. Its a new column
+ List<FieldSchema> tupSchema = bagFieldSchema.schema.getFields();
+ if (hcatFieldSchema == null && tupSchema.size() == 1 && (tupSchema.get(0).schema == null || (tupSchema.get(0).type == DataType.TUPLE && tupSchema.get(0).schema.size() == 1))) {
+ return true;
+ }
+ return false;
+ }
+
+
+ private HCatFieldSchema getHCatFSFromPigFS(FieldSchema fSchema, HCatFieldSchema hcatFieldSchema) throws FrontendException, HCatException {
+ byte type = fSchema.type;
+ switch (type) {
+
+ case DataType.CHARARRAY:
+ case DataType.BIGCHARARRAY:
+ return new HCatFieldSchema(fSchema.alias, Type.STRING, null);
+
+ case DataType.INTEGER:
+ if (hcatFieldSchema != null) {
+ if (!SUPPORTED_INTEGER_CONVERSIONS.contains(hcatFieldSchema.getType())) {
+ throw new FrontendException("Unsupported type: " + type + " in Pig's schema",
+ PigHCatUtil.PIG_EXCEPTION_CODE);
+ }
+ return new HCatFieldSchema(fSchema.alias, hcatFieldSchema.getType(), null);
+ } else {
+ return new HCatFieldSchema(fSchema.alias, Type.INT, null);
+ }
+
+ case DataType.LONG:
+ return new HCatFieldSchema(fSchema.alias, Type.BIGINT, null);
+
+ case DataType.FLOAT:
+ return new HCatFieldSchema(fSchema.alias, Type.FLOAT, null);
+
+ case DataType.DOUBLE:
+ return new HCatFieldSchema(fSchema.alias, Type.DOUBLE, null);
+
+ case DataType.BYTEARRAY:
+ return new HCatFieldSchema(fSchema.alias, Type.BINARY, null);
+
+ case DataType.BAG:
+ Schema bagSchema = fSchema.schema;
+ List<HCatFieldSchema> arrFields = new ArrayList<HCatFieldSchema>(1);
+ FieldSchema field;
+ // Find out if we need to throw away the tuple or not.
+ if (removeTupleFromBag(hcatFieldSchema, fSchema)) {
+ field = bagSchema.getField(0).schema.getField(0);
+ } else {
+ field = bagSchema.getField(0);
+ }
+ arrFields.add(getHCatFSFromPigFS(field, hcatFieldSchema == null ? null : hcatFieldSchema.getArrayElementSchema().get(0)));
+ return new HCatFieldSchema(fSchema.alias, Type.ARRAY, new HCatSchema(arrFields), "");
+
+ case DataType.TUPLE:
+ List<String> fieldNames = new ArrayList<String>();
+ List<HCatFieldSchema> hcatFSs = new ArrayList<HCatFieldSchema>();
+ HCatSchema structSubSchema = hcatFieldSchema == null ? null : hcatFieldSchema.getStructSubSchema();
+ List<FieldSchema> fields = fSchema.schema.getFields();
+ for (int i = 0; i < fields.size(); i++) {
+ FieldSchema fieldSchema = fields.get(i);
+ fieldNames.add(fieldSchema.alias);
+ hcatFSs.add(getHCatFSFromPigFS(fieldSchema, structSubSchema == null ? null : structSubSchema.get(i)));
+ }
+ return new HCatFieldSchema(fSchema.alias, Type.STRUCT, new HCatSchema(hcatFSs), "");
+
+ case DataType.MAP: {
+ // Pig's schema contain no type information about map's keys and
+ // values. So, if its a new column assume <string,string> if its existing
+ // return whatever is contained in the existing column.
+
+ HCatFieldSchema valFS;
+ List<HCatFieldSchema> valFSList = new ArrayList<HCatFieldSchema>(1);
+
+ if (hcatFieldSchema != null) {
+ return new HCatFieldSchema(fSchema.alias, Type.MAP, Type.STRING, hcatFieldSchema.getMapValueSchema(), "");
+ }
+
+ // Column not found in target table. Its a new column. Its schema is map<string,string>
+ valFS = new HCatFieldSchema(fSchema.alias, Type.STRING, "");
+ valFSList.add(valFS);
+ return new HCatFieldSchema(fSchema.alias, Type.MAP, Type.STRING, new HCatSchema(valFSList), "");
+ }
+
+ default:
+ throw new FrontendException("Unsupported type: " + type + " in Pig's schema", PigHCatUtil.PIG_EXCEPTION_CODE);
+ }
+ }
+
+ @Override
+ public void prepareToWrite(RecordWriter writer) throws IOException {
+ this.writer = writer;
+ computedSchema = (HCatSchema) ObjectSerializer.deserialize(UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[]{sign}).getProperty(COMPUTED_OUTPUT_SCHEMA));
+ }
+
+ @Override
+ public void putNext(Tuple tuple) throws IOException {
+
+ List<Object> outgoing = new ArrayList<Object>(tuple.size());
+
+ int i = 0;
+ for (HCatFieldSchema fSchema : computedSchema.getFields()) {
+ outgoing.add(getJavaObj(tuple.get(i++), fSchema));
+ }
+ try {
+ writer.write(null, new DefaultHCatRecord(outgoing));
+ } catch (InterruptedException e) {
+ throw new BackendException("Error while writing tuple: " + tuple, PigHCatUtil.PIG_EXCEPTION_CODE, e);
+ }
+ }
+
+ private Object getJavaObj(Object pigObj, HCatFieldSchema hcatFS) throws HCatException, BackendException {
+ try {
+
+ // The real work-horse. Spend time and energy in this method if there is
+ // need to keep HCatStorer lean and go fast.
+ Type type = hcatFS.getType();
+ switch (type) {
+
+ case BINARY:
+ if (pigObj == null) {
+ return null;
+ }
+ return ((DataByteArray) pigObj).get();
+
+ case STRUCT:
+ if (pigObj == null) {
+ return null;
+ }
+ HCatSchema structSubSchema = hcatFS.getStructSubSchema();
+ // Unwrap the tuple.
+ List<Object> all = ((Tuple) pigObj).getAll();
+ ArrayList<Object> converted = new ArrayList<Object>(all.size());
+ for (int i = 0; i < all.size(); i++) {
+ converted.add(getJavaObj(all.get(i), structSubSchema.get(i)));
+ }
+ return converted;
+
+ case ARRAY:
+ if (pigObj == null) {
+ return null;
+ }
+ // Unwrap the bag.
+ DataBag pigBag = (DataBag) pigObj;
+ HCatFieldSchema tupFS = hcatFS.getArrayElementSchema().get(0);
+ boolean needTuple = tupFS.getType() == Type.STRUCT;
+ List<Object> bagContents = new ArrayList<Object>((int) pigBag.size());
+ Iterator<Tuple> bagItr = pigBag.iterator();
+
+ while (bagItr.hasNext()) {
+ // If there is only one element in tuple contained in bag, we throw away the tuple.
+ bagContents.add(getJavaObj(needTuple ? bagItr.next() : bagItr.next().get(0), tupFS));
+
+ }
+ return bagContents;
+ case MAP:
+ if (pigObj == null) {
+ return null;
+ }
+ Map<?, ?> pigMap = (Map<?, ?>) pigObj;
+ Map<Object, Object> typeMap = new HashMap<Object, Object>();
+ for (Entry<?, ?> entry : pigMap.entrySet()) {
+ // the value has a schema and not a FieldSchema
+ typeMap.put(
+ // Schema validation enforces that the Key is a String
+ (String) entry.getKey(),
+ getJavaObj(entry.getValue(), hcatFS.getMapValueSchema().get(0)));
+ }
+ return typeMap;
+ case STRING:
+ case INT:
+ case BIGINT:
+ case FLOAT:
+ case DOUBLE:
+ return pigObj;
+ case SMALLINT:
+ if (pigObj == null) {
+ return null;
+ }
+ if ((Integer) pigObj < Short.MIN_VALUE || (Integer) pigObj > Short.MAX_VALUE) {
+ throw new BackendException("Value " + pigObj + " is outside the bounds of column " +
+ hcatFS.getName() + " with type " + hcatFS.getType(), PigHCatUtil.PIG_EXCEPTION_CODE);
+ }
+ return ((Integer) pigObj).shortValue();
+ case TINYINT:
+ if (pigObj == null) {
+ return null;
+ }
+ if ((Integer) pigObj < Byte.MIN_VALUE || (Integer) pigObj > Byte.MAX_VALUE) {
+ throw new BackendException("Value " + pigObj + " is outside the bounds of column " +
+ hcatFS.getName() + " with type " + hcatFS.getType(), PigHCatUtil.PIG_EXCEPTION_CODE);
+ }
+ return ((Integer) pigObj).byteValue();
+ case BOOLEAN:
+ // would not pass schema validation anyway
+ throw new BackendException("Incompatible type " + type + " found in hcat table schema: " + hcatFS, PigHCatUtil.PIG_EXCEPTION_CODE);
+ default:
+ throw new BackendException("Unexpected type " + type + " for value " + pigObj + (pigObj == null ? "" : " of class " + pigObj.getClass().getName()), PigHCatUtil.PIG_EXCEPTION_CODE);
+ }
+ } catch (BackendException e) {
+ // provide the path to the field in the error message
+ throw new BackendException(
+ (hcatFS.getName() == null ? " " : hcatFS.getName() + ".") + e.getMessage(),
+ e.getCause() == null ? e : e.getCause());
+ }
+ }
+
+ @Override
+ public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException {
+
+ // Need to necessarily override this method since default impl assumes HDFS
+ // based location string.
+ return location;
+ }
+
+ @Override
+ public void setStoreFuncUDFContextSignature(String signature) {
+ sign = signature;
+ }
+
+
+ protected void doSchemaValidations(Schema pigSchema, HCatSchema tblSchema) throws FrontendException, HCatException {
+
+ // Iterate through all the elements in Pig Schema and do validations as
+ // dictated by semantics, consult HCatSchema of table when need be.
+
+ for (FieldSchema pigField : pigSchema.getFields()) {
+ HCatFieldSchema hcatField = getColFromSchema(pigField.alias, tblSchema);
+ validateSchema(pigField, hcatField);
+ }
+
+ try {
+ PigHCatUtil.validateHCatTableSchemaFollowsPigRules(tblSchema);
+ } catch (IOException e) {
+ throw new FrontendException("HCatalog schema is not compatible with Pig: " + e.getMessage(), PigHCatUtil.PIG_EXCEPTION_CODE, e);
+ }
+ }
+
+
+ private void validateSchema(FieldSchema pigField, HCatFieldSchema hcatField)
+ throws HCatException, FrontendException {
+ validateAlias(pigField.alias);
+ byte type = pigField.type;
+ if (DataType.isComplex(type)) {
+ switch (type) {
+
+ case DataType.MAP:
+ if (hcatField != null) {
+ if (hcatField.getMapKeyType() != Type.STRING) {
+ throw new FrontendException("Key Type of map must be String " + hcatField, PigHCatUtil.PIG_EXCEPTION_CODE);
+ }
+ // Map values can be primitive or complex
+ }
+ break;
+
+ case DataType.BAG:
+ HCatSchema arrayElementSchema = hcatField == null ? null : hcatField.getArrayElementSchema();
+ for (FieldSchema innerField : pigField.schema.getField(0).schema.getFields()) {
+ validateSchema(innerField, getColFromSchema(pigField.alias, arrayElementSchema));
+ }
+ break;
+
+ case DataType.TUPLE:
+ HCatSchema structSubSchema = hcatField == null ? null : hcatField.getStructSubSchema();
+ for (FieldSchema innerField : pigField.schema.getFields()) {
+ validateSchema(innerField, getColFromSchema(pigField.alias, structSubSchema));
+ }
+ break;
+
+ default:
+ throw new FrontendException("Internal Error.", PigHCatUtil.PIG_EXCEPTION_CODE);
+ }
+ }
+ }
+
+ private void validateAlias(String alias) throws FrontendException {
+ if (alias == null) {
+ throw new FrontendException("Column name for a field is not specified. Please provide the full schema as an argument to HCatStorer.", PigHCatUtil.PIG_EXCEPTION_CODE);
+ }
+ if (alias.matches(".*[A-Z]+.*")) {
+ throw new FrontendException("Column names should all be in lowercase. Invalid name found: " + alias, PigHCatUtil.PIG_EXCEPTION_CODE);
+ }
+ }
+
+ // Finds column by name in HCatSchema, if not found returns null.
+ private HCatFieldSchema getColFromSchema(String alias, HCatSchema tblSchema) {
+ if (tblSchema != null) {
+ for (HCatFieldSchema hcatField : tblSchema.getFields()) {
+ if (hcatField != null && hcatField.getName() != null && hcatField.getName().equalsIgnoreCase(alias)) {
+ return hcatField;
+ }
+ }
+ }
+ // Its a new column
+ return null;
+ }
+
+ @Override
+ public void cleanupOnFailure(String location, Job job) throws IOException {
+ // No-op.
+ }
+
+ @Override
+ public void storeStatistics(ResourceStatistics stats, String arg1, Job job) throws IOException {
+ }
+}
Added: hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatEximLoader.java.broken
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatEximLoader.java.broken?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatEximLoader.java.broken (added)
+++ hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatEximLoader.java.broken Fri Sep 6 00:49:14 2013
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.pig;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.mapreduce.HCatBaseInputFormat;
+import org.apache.hcatalog.mapreduce.HCatEximInputFormat;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.pig.Expression;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.impl.util.UDFContext;
+
+/**
+ * Pig {@link LoadFunc} to read data/metadata from hcatalog exported location
+ */
+
+public class HCatEximLoader extends HCatBaseLoader {
+
+ private static final Log LOG = LogFactory.getLog(HCatEximLoader.class);
+
+ private HCatSchema tableSchema;
+ private HCatSchema partitionSchema;
+ private HCatEximInputFormat inputFormat;
+
+ public HCatEximLoader() {
+ LOG.debug("HCatEximLoader ctored");
+ }
+
+ @Override
+ public ResourceSchema getSchema(String location, Job job) throws IOException {
+ LOG.debug("getSchema with location :" + location);
+ if (tableSchema == null) {
+ List<HCatSchema> rv = HCatEximInputFormat.setInput(job, location, null);
+ tableSchema = rv.get(0);
+ partitionSchema = rv.get(1);
+ }
+ LOG.debug("getSchema got schema :" + tableSchema.toString());
+ List<HCatFieldSchema> colsPlusPartKeys = new ArrayList<HCatFieldSchema>();
+ colsPlusPartKeys.addAll(tableSchema.getFields());
+ colsPlusPartKeys.addAll(partitionSchema.getFields());
+ outputSchema = new HCatSchema(colsPlusPartKeys);
+ return PigHCatUtil.getResourceSchema(outputSchema);
+ }
+
+ @Override
+ public String[] getPartitionKeys(String location, Job job) throws IOException {
+ LOG.warn("getPartitionKeys with location :" + location);
+ /*
+ if (tableSchema == null) {
+ List<HCatSchema> rv = HCatEximInputFormat.setInput(job, location, null);
+ tableSchema = rv.get(0);
+ partitionSchema = rv.get(1);
+ }
+ return partitionSchema.getFieldNames().toArray(new String[0]);
+ */
+ return null;
+ }
+
+ @Override
+ public void setPartitionFilter(Expression partitionFilter) throws IOException {
+ LOG.debug("setPartitionFilter with filter :" + partitionFilter.toString());
+ }
+
+ @Override
+ public void setLocation(String location, Job job) throws IOException {
+ LOG.debug("setLocation with location :" + location);
+ List<HCatSchema> rv = HCatEximInputFormat.setInput(job, location, null);
+ tableSchema = rv.get(0);
+ partitionSchema = rv.get(1);
+ List<HCatFieldSchema> colsPlusPartKeys = new ArrayList<HCatFieldSchema>();
+ colsPlusPartKeys.addAll(tableSchema.getFields());
+ colsPlusPartKeys.addAll(partitionSchema.getFields());
+ outputSchema = new HCatSchema(colsPlusPartKeys);
+ UDFContext udfContext = UDFContext.getUDFContext();
+ Properties props = udfContext.getUDFProperties(this.getClass(),
+ new String[] {signature});
+ RequiredFieldList requiredFieldsInfo =
+ (RequiredFieldList) props.get(PRUNE_PROJECTION_INFO);
+ if (requiredFieldsInfo != null) {
+ ArrayList<HCatFieldSchema> fcols = new ArrayList<HCatFieldSchema>();
+ for (RequiredField rf : requiredFieldsInfo.getFields()) {
+ fcols.add(tableSchema.getFields().get(rf.getIndex()));
+ }
+ outputSchema = new HCatSchema(fcols);
+ try {
+ HCatBaseInputFormat.setOutputSchema(job, outputSchema);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+ }
+
+
+ @Override
+ public InputFormat getInputFormat() throws IOException {
+ if (inputFormat == null) {
+ inputFormat = new HCatEximInputFormat();
+ }
+ return inputFormat;
+ }
+
+}
Added: hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatEximStorer.java.broken
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatEximStorer.java.broken?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatEximStorer.java.broken (added)
+++ hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatEximStorer.java.broken Fri Sep 6 00:49:14 2013
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.pig;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.mapreduce.HCatEximOutputCommitter;
+import org.apache.hcatalog.mapreduce.HCatEximOutputFormat;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.UDFContext;
+
+/**
+ * HCatEximStorer.
+ *
+ */
+
+public class HCatEximStorer extends HCatBaseStorer {
+
+ private static final Log LOG = LogFactory.getLog(HCatEximStorer.class);
+
+ private final String outputLocation;
+
+ public HCatEximStorer(String outputLocation) throws Exception {
+ this(outputLocation, null, null);
+ }
+
+ public HCatEximStorer(String outputLocation, String partitionSpec) throws Exception {
+ this(outputLocation, partitionSpec, null);
+ }
+
+ public HCatEximStorer(String outputLocation, String partitionSpec, String schema)
+ throws Exception {
+ super(partitionSpec, schema);
+ this.outputLocation = outputLocation;
+ LOG.debug("HCatEximStorer called");
+ }
+
+ @Override
+ public OutputFormat getOutputFormat() throws IOException {
+ LOG.debug("getOutputFormat called");
+ return new HCatEximOutputFormat();
+ }
+
+ @Override
+ public void setStoreLocation(String location, Job job) throws IOException {
+ LOG.debug("setStoreLocation called with :" + location);
+ String[] userStr = location.split("\\.");
+ String dbname = MetaStoreUtils.DEFAULT_DATABASE_NAME;
+ String tablename = null;
+ if (userStr.length == 2) {
+ dbname = userStr[0];
+ tablename = userStr[1];
+ } else {
+ tablename = userStr[0];
+ }
+ Properties p = UDFContext.getUDFContext()
+ .getUDFProperties(this.getClass(), new String[] {sign});
+ Configuration config = job.getConfiguration();
+ if (!HCatUtil.checkJobContextIfRunningFromBackend(job)) {
+ Schema schema = (Schema) ObjectSerializer.deserialize(p.getProperty(PIG_SCHEMA));
+ if (schema != null) {
+ pigSchema = schema;
+ }
+ if (pigSchema == null) {
+ throw new FrontendException("Schema for data cannot be determined.",
+ PigHCatUtil.PIG_EXCEPTION_CODE);
+ }
+ HCatSchema hcatTblSchema = new HCatSchema(new ArrayList<HCatFieldSchema>());
+ try {
+ doSchemaValidations(pigSchema, hcatTblSchema);
+ } catch (HCatException he) {
+ throw new FrontendException(he.getMessage(), PigHCatUtil.PIG_EXCEPTION_CODE, he);
+ }
+
+ List<HCatFieldSchema> hcatFields = new ArrayList<HCatFieldSchema>();
+ List<String> partVals = new ArrayList<String>();
+ for (String key : partitionKeys) {
+ hcatFields.add(new HCatFieldSchema(key, HCatFieldSchema.Type.STRING, ""));
+ partVals.add(partitions.get(key));
+ }
+
+ HCatSchema outputSchema = convertPigSchemaToHCatSchema(pigSchema,
+ hcatTblSchema);
+ LOG.debug("Pig Schema '" + pigSchema.toString() + "' was converted to HCatSchema '"
+ + outputSchema);
+ HCatEximOutputFormat.setOutput(job,
+ dbname, tablename,
+ outputLocation,
+ new HCatSchema(hcatFields),
+ partVals,
+ outputSchema);
+ p.setProperty(COMPUTED_OUTPUT_SCHEMA, ObjectSerializer.serialize(outputSchema));
+ p.setProperty(HCatConstants.HCAT_KEY_OUTPUT_INFO,
+ config.get(HCatConstants.HCAT_KEY_OUTPUT_INFO));
+ if (config.get(HCatConstants.HCAT_KEY_HIVE_CONF) != null) {
+ p.setProperty(HCatConstants.HCAT_KEY_HIVE_CONF,
+ config.get(HCatConstants.HCAT_KEY_HIVE_CONF));
+ }
+ } else {
+ config.set(HCatConstants.HCAT_KEY_OUTPUT_INFO,
+ p.getProperty(HCatConstants.HCAT_KEY_OUTPUT_INFO));
+ if (p.getProperty(HCatConstants.HCAT_KEY_HIVE_CONF) != null) {
+ config.set(HCatConstants.HCAT_KEY_HIVE_CONF,
+ p.getProperty(HCatConstants.HCAT_KEY_HIVE_CONF));
+ }
+ }
+ }
+
+ @Override
+ public void storeSchema(ResourceSchema schema, String arg1, Job job) throws IOException {
+ if( job.getConfiguration().get("mapred.job.tracker", "").equalsIgnoreCase("local") ) {
+ //In local mode, mapreduce will not call OutputCommitter.cleanupJob.
+ //Calling it from here so that the partition publish happens.
+ //This call needs to be removed after MAPREDUCE-1447 is fixed.
+ new HCatEximOutputCommitter(job,null).cleanupJob(job);
+ }
+ }
+}
Added: hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatLoader.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatLoader.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatLoader.java (added)
+++ hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatLoader.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,273 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hcatalog.pig;
+
+import java.io.IOException;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatContext;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.Pair;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.hcatalog.mapreduce.InputJobInfo;
+import org.apache.pig.Expression;
+import org.apache.pig.Expression.BinaryExpression;
+import org.apache.pig.PigException;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceStatistics;
+import org.apache.pig.impl.util.UDFContext;
+
+/**
+ * Pig {@link org.apache.pig.LoadFunc} to read data from HCat
+ */
+
+public class HCatLoader extends HCatBaseLoader {
+
+ private static final String PARTITION_FILTER = "partition.filter"; // for future use
+
+ private HCatInputFormat hcatInputFormat = null;
+ private String dbName;
+ private String tableName;
+ private String hcatServerUri;
+ private String partitionFilterString;
+ private final PigHCatUtil phutil = new PigHCatUtil();
+
+ // Signature for wrapped loader, see comments in LoadFuncBasedInputDriver.initialize
+ final public static String INNER_SIGNATURE = "hcatloader.inner.signature";
+ final public static String INNER_SIGNATURE_PREFIX = "hcatloader_inner_signature";
+ // A hash map which stores job credentials. The key is a signature passed by Pig, which is
+ //unique to the load func and input file name (table, in our case).
+ private static Map<String, Credentials> jobCredentials = new HashMap<String, Credentials>();
+
+ @Override
+ public InputFormat<?, ?> getInputFormat() throws IOException {
+ if (hcatInputFormat == null) {
+ hcatInputFormat = new HCatInputFormat();
+ }
+ return hcatInputFormat;
+ }
+
+ @Override
+ public String relativeToAbsolutePath(String location, Path curDir) throws IOException {
+ return location;
+ }
+
+ @Override
+ public void setLocation(String location, Job job) throws IOException {
+ HCatContext.INSTANCE.setConf(job.getConfiguration()).getConf().get()
+ .setBoolean(HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION, true);
+
+ UDFContext udfContext = UDFContext.getUDFContext();
+ Properties udfProps = udfContext.getUDFProperties(this.getClass(),
+ new String[]{signature});
+ job.getConfiguration().set(INNER_SIGNATURE, INNER_SIGNATURE_PREFIX + "_" + signature);
+ Pair<String, String> dbTablePair = PigHCatUtil.getDBTableNames(location);
+ dbName = dbTablePair.first;
+ tableName = dbTablePair.second;
+
+ RequiredFieldList requiredFieldsInfo = (RequiredFieldList) udfProps
+ .get(PRUNE_PROJECTION_INFO);
+ // get partitionFilterString stored in the UDFContext - it would have
+ // been stored there by an earlier call to setPartitionFilter
+ // call setInput on HCatInputFormat only in the frontend because internally
+ // it makes calls to the hcat server - we don't want these to happen in
+ // the backend
+ // in the hadoop front end mapred.task.id property will not be set in
+ // the Configuration
+ if (udfProps.containsKey(HCatConstants.HCAT_PIG_LOADER_LOCATION_SET)) {
+ for (Enumeration<Object> emr = udfProps.keys(); emr.hasMoreElements(); ) {
+ PigHCatUtil.getConfigFromUDFProperties(udfProps,
+ job.getConfiguration(), emr.nextElement().toString());
+ }
+ if (!HCatUtil.checkJobContextIfRunningFromBackend(job)) {
+ //Combine credentials and credentials from job takes precedence for freshness
+ Credentials crd = jobCredentials.get(INNER_SIGNATURE_PREFIX + "_" + signature);
+ crd.addAll(job.getCredentials());
+ job.getCredentials().addAll(crd);
+ }
+ } else {
+ Job clone = new Job(job.getConfiguration());
+ HCatInputFormat.setInput(job, dbName, tableName).setFilter(getPartitionFilterString());
+
+ // We will store all the new /changed properties in the job in the
+ // udf context, so the the HCatInputFormat.setInput method need not
+ //be called many times.
+ for (Entry<String, String> keyValue : job.getConfiguration()) {
+ String oldValue = clone.getConfiguration().getRaw(keyValue.getKey());
+ if ((oldValue == null) || (keyValue.getValue().equals(oldValue) == false)) {
+ udfProps.put(keyValue.getKey(), keyValue.getValue());
+ }
+ }
+ udfProps.put(HCatConstants.HCAT_PIG_LOADER_LOCATION_SET, true);
+
+ //Store credentials in a private hash map and not the udf context to
+ // make sure they are not public.
+ Credentials crd = new Credentials();
+ crd.addAll(job.getCredentials());
+ jobCredentials.put(INNER_SIGNATURE_PREFIX + "_" + signature, crd);
+ }
+
+ // Need to also push projections by calling setOutputSchema on
+ // HCatInputFormat - we have to get the RequiredFields information
+ // from the UdfContext, translate it to an Schema and then pass it
+ // The reason we do this here is because setLocation() is called by
+ // Pig runtime at InputFormat.getSplits() and
+ // InputFormat.createRecordReader() time - we are not sure when
+ // HCatInputFormat needs to know about pruned projections - so doing it
+ // here will ensure we communicate to HCatInputFormat about pruned
+ // projections at getSplits() and createRecordReader() time
+
+ if (requiredFieldsInfo != null) {
+ // convert to hcatschema and pass to HCatInputFormat
+ try {
+ outputSchema = phutil.getHCatSchema(requiredFieldsInfo.getFields(), signature, this.getClass());
+ HCatInputFormat.setOutputSchema(job, outputSchema);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ } else {
+ // else - this means pig's optimizer never invoked the pushProjection
+ // method - so we need all fields and hence we should not call the
+ // setOutputSchema on HCatInputFormat
+ if (HCatUtil.checkJobContextIfRunningFromBackend(job)) {
+ try {
+ HCatSchema hcatTableSchema = (HCatSchema) udfProps.get(HCatConstants.HCAT_TABLE_SCHEMA);
+ outputSchema = hcatTableSchema;
+ HCatInputFormat.setOutputSchema(job, outputSchema);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+ }
+
+ }
+
+ @Override
+ public String[] getPartitionKeys(String location, Job job)
+ throws IOException {
+ Table table = phutil.getTable(location,
+ hcatServerUri != null ? hcatServerUri : PigHCatUtil.getHCatServerUri(job),
+ PigHCatUtil.getHCatServerPrincipal(job));
+ List<FieldSchema> tablePartitionKeys = table.getPartitionKeys();
+ String[] partitionKeys = new String[tablePartitionKeys.size()];
+ for (int i = 0; i < tablePartitionKeys.size(); i++) {
+ partitionKeys[i] = tablePartitionKeys.get(i).getName();
+ }
+ return partitionKeys;
+ }
+
+ @Override
+ public ResourceSchema getSchema(String location, Job job) throws IOException {
+ HCatContext.INSTANCE.setConf(job.getConfiguration()).getConf().get()
+ .setBoolean(HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION, true);
+
+ Table table = phutil.getTable(location,
+ hcatServerUri != null ? hcatServerUri : PigHCatUtil.getHCatServerUri(job),
+ PigHCatUtil.getHCatServerPrincipal(job));
+ HCatSchema hcatTableSchema = HCatUtil.getTableSchemaWithPtnCols(table);
+ try {
+ PigHCatUtil.validateHCatTableSchemaFollowsPigRules(hcatTableSchema);
+ } catch (IOException e) {
+ throw new PigException(
+ "Table schema incompatible for reading through HCatLoader :" + e.getMessage()
+ + ";[Table schema was " + hcatTableSchema.toString() + "]"
+ , PigHCatUtil.PIG_EXCEPTION_CODE, e);
+ }
+ storeInUDFContext(signature, HCatConstants.HCAT_TABLE_SCHEMA, hcatTableSchema);
+ outputSchema = hcatTableSchema;
+ return PigHCatUtil.getResourceSchema(hcatTableSchema);
+ }
+
+ @Override
+ public void setPartitionFilter(Expression partitionFilter) throws IOException {
+ // convert the partition filter expression into a string expected by
+ // hcat and pass it in setLocation()
+
+ partitionFilterString = getHCatComparisonString(partitionFilter);
+
+ // store this in the udf context so we can get it later
+ storeInUDFContext(signature,
+ PARTITION_FILTER, partitionFilterString);
+ }
+
+ /**
+ * Get statistics about the data to be loaded. Only input data size is implemented at this time.
+ */
+ @Override
+ public ResourceStatistics getStatistics(String location, Job job) throws IOException {
+ try {
+ ResourceStatistics stats = new ResourceStatistics();
+ InputJobInfo inputJobInfo = (InputJobInfo) HCatUtil.deserialize(
+ job.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO));
+ stats.setmBytes(getSizeInBytes(inputJobInfo) / 1024 / 1024);
+ return stats;
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ private String getPartitionFilterString() {
+ if (partitionFilterString == null) {
+ Properties props = UDFContext.getUDFContext().getUDFProperties(
+ this.getClass(), new String[]{signature});
+ partitionFilterString = props.getProperty(PARTITION_FILTER);
+ }
+ return partitionFilterString;
+ }
+
+ private String getHCatComparisonString(Expression expr) {
+ if (expr instanceof BinaryExpression) {
+ // call getHCatComparisonString on lhs and rhs, and and join the
+ // results with OpType string
+
+ // we can just use OpType.toString() on all Expression types except
+ // Equal, NotEqualt since Equal has '==' in toString() and
+ // we need '='
+ String opStr = null;
+ switch (expr.getOpType()) {
+ case OP_EQ:
+ opStr = " = ";
+ break;
+ default:
+ opStr = expr.getOpType().toString();
+ }
+ BinaryExpression be = (BinaryExpression) expr;
+ return "(" + getHCatComparisonString(be.getLhs()) +
+ opStr +
+ getHCatComparisonString(be.getRhs()) + ")";
+ } else {
+ // should be a constant or column
+ return expr.toString();
+ }
+ }
+
+}
Added: hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatStorer.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatStorer.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatStorer.java (added)
+++ hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatStorer.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.pig;
+
+import java.io.IOException;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatContext;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.mapreduce.HCatOutputFormat;
+import org.apache.hcatalog.mapreduce.OutputJobInfo;
+import org.apache.hcatalog.shims.HCatHadoopShims;
+import org.apache.pig.PigException;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.UDFContext;
+
+/**
+ * HCatStorer.
+ *
+ */
+
+public class HCatStorer extends HCatBaseStorer {
+
+ // Signature for wrapped storer, see comments in LoadFuncBasedInputDriver.initialize
+ final public static String INNER_SIGNATURE = "hcatstorer.inner.signature";
+ final public static String INNER_SIGNATURE_PREFIX = "hcatstorer_inner_signature";
+ // A hash map which stores job credentials. The key is a signature passed by Pig, which is
+ //unique to the store func and out file name (table, in our case).
+ private static Map<String, Credentials> jobCredentials = new HashMap<String, Credentials>();
+
+
+ public HCatStorer(String partSpecs, String schema) throws Exception {
+ super(partSpecs, schema);
+ }
+
+ public HCatStorer(String partSpecs) throws Exception {
+ this(partSpecs, null);
+ }
+
+ public HCatStorer() throws Exception {
+ this(null, null);
+ }
+
+ @Override
+ public OutputFormat getOutputFormat() throws IOException {
+ return new HCatOutputFormat();
+ }
+
+ @Override
+ public void setStoreLocation(String location, Job job) throws IOException {
+ HCatContext.INSTANCE.setConf(job.getConfiguration()).getConf().get()
+ .setBoolean(HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION, false);
+
+ Configuration config = job.getConfiguration();
+ config.set(INNER_SIGNATURE, INNER_SIGNATURE_PREFIX + "_" + sign);
+ Properties udfProps = UDFContext.getUDFContext().getUDFProperties(
+ this.getClass(), new String[]{sign});
+ String[] userStr = location.split("\\.");
+
+ if (udfProps.containsKey(HCatConstants.HCAT_PIG_STORER_LOCATION_SET)) {
+ for (Enumeration<Object> emr = udfProps.keys(); emr.hasMoreElements(); ) {
+ PigHCatUtil.getConfigFromUDFProperties(udfProps, config, emr.nextElement().toString());
+ }
+ Credentials crd = jobCredentials.get(INNER_SIGNATURE_PREFIX + "_" + sign);
+ if (crd != null) {
+ job.getCredentials().addAll(crd);
+ }
+ } else {
+ Job clone = new Job(job.getConfiguration());
+ OutputJobInfo outputJobInfo;
+ if (userStr.length == 2) {
+ outputJobInfo = OutputJobInfo.create(userStr[0], userStr[1], partitions);
+ } else if (userStr.length == 1) {
+ outputJobInfo = OutputJobInfo.create(null, userStr[0], partitions);
+ } else {
+ throw new FrontendException("location " + location
+ + " is invalid. It must be of the form [db.]table",
+ PigHCatUtil.PIG_EXCEPTION_CODE);
+ }
+ Schema schema = (Schema) ObjectSerializer.deserialize(udfProps.getProperty(PIG_SCHEMA));
+ if (schema != null) {
+ pigSchema = schema;
+ }
+ if (pigSchema == null) {
+ throw new FrontendException(
+ "Schema for data cannot be determined.",
+ PigHCatUtil.PIG_EXCEPTION_CODE);
+ }
+ String externalLocation = (String) udfProps.getProperty(HCatConstants.HCAT_PIG_STORER_EXTERNAL_LOCATION);
+ if (externalLocation != null) {
+ outputJobInfo.setLocation(externalLocation);
+ }
+ try {
+ HCatOutputFormat.setOutput(job, outputJobInfo);
+ } catch (HCatException he) {
+ // pass the message to the user - essentially something about
+ // the table
+ // information passed to HCatOutputFormat was not right
+ throw new PigException(he.getMessage(),
+ PigHCatUtil.PIG_EXCEPTION_CODE, he);
+ }
+ HCatSchema hcatTblSchema = HCatOutputFormat.getTableSchema(job);
+ try {
+ doSchemaValidations(pigSchema, hcatTblSchema);
+ } catch (HCatException he) {
+ throw new FrontendException(he.getMessage(), PigHCatUtil.PIG_EXCEPTION_CODE, he);
+ }
+ computedSchema = convertPigSchemaToHCatSchema(pigSchema, hcatTblSchema);
+ HCatOutputFormat.setSchema(job, computedSchema);
+ udfProps.setProperty(COMPUTED_OUTPUT_SCHEMA, ObjectSerializer.serialize(computedSchema));
+
+ // We will store all the new /changed properties in the job in the
+ // udf context, so the the HCatOutputFormat.setOutput and setSchema
+ // methods need not be called many times.
+ for (Entry<String, String> keyValue : job.getConfiguration()) {
+ String oldValue = clone.getConfiguration().getRaw(keyValue.getKey());
+ if ((oldValue == null) || (keyValue.getValue().equals(oldValue) == false)) {
+ udfProps.put(keyValue.getKey(), keyValue.getValue());
+ }
+ }
+ //Store credentials in a private hash map and not the udf context to
+ // make sure they are not public.
+ jobCredentials.put(INNER_SIGNATURE_PREFIX + "_" + sign, job.getCredentials());
+ udfProps.put(HCatConstants.HCAT_PIG_STORER_LOCATION_SET, true);
+ }
+ }
+
+ @Override
+ public void storeSchema(ResourceSchema schema, String arg1, Job job) throws IOException {
+ HCatHadoopShims.Instance.get().commitJob(getOutputFormat(), job);
+ }
+
+ @Override
+ public void cleanupOnFailure(String location, Job job) throws IOException {
+ HCatHadoopShims.Instance.get().abortJob(getOutputFormat(), job);
+ }
+}
Added: hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/PigHCatUtil.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/PigHCatUtil.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/PigHCatUtil.java (added)
+++ hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/PigHCatUtil.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,488 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hcatalog.pig;
+
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.Pair;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatFieldSchema.Type;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.pig.LoadPushDown.RequiredField;
+import org.apache.pig.PigException;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.DefaultDataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.impl.util.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class PigHCatUtil {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PigHCatUtil.class);
+
+ static final int PIG_EXCEPTION_CODE = 1115; // http://wiki.apache.org/pig/PigErrorHandlingFunctionalSpecification#Error_codes
+ private static final String DEFAULT_DB = MetaStoreUtils.DEFAULT_DATABASE_NAME;
+
+ private final Map<Pair<String, String>, Table> hcatTableCache =
+ new HashMap<Pair<String, String>, Table>();
+
+ private static final TupleFactory tupFac = TupleFactory.getInstance();
+
+ private static boolean pigHasBooleanSupport = false;
+
+ /**
+ * Determine if the current Pig version supports boolean columns. This works around a
+ * dependency conflict preventing HCatalog from requiring a version of Pig with boolean
+ * field support and should be removed once HCATALOG-466 has been resolved.
+ */
+ static {
+ // DETAILS:
+ //
+ // PIG-1429 added support for boolean fields, which shipped in 0.10.0;
+ // this version of Pig depends on antlr 3.4.
+ //
+ // HCatalog depends heavily on Hive, which at this time uses antlr 3.0.1.
+ //
+ // antlr 3.0.1 and 3.4 are incompatible, so Pig 0.10.0 and Hive cannot be depended on in the
+ // same project. Pig 0.8.0 did not use antlr for its parser and can coexist with Hive,
+ // so that Pig version is depended on by HCatalog at this time.
+ try {
+ Schema schema = Utils.getSchemaFromString("myBooleanField: boolean");
+ pigHasBooleanSupport = (schema.getField("myBooleanField").type == DataType.BOOLEAN);
+ } catch (Throwable e) {
+ // pass
+ }
+
+ if (!pigHasBooleanSupport) {
+ LOG.info("This version of Pig does not support boolean fields. To enable "
+ + "boolean-to-integer conversion, set the "
+ + HCatConstants.HCAT_DATA_CONVERT_BOOLEAN_TO_INTEGER
+ + "=true configuration parameter.");
+ }
+ }
+
+ static public Pair<String, String> getDBTableNames(String location) throws IOException {
+ // the location string will be of the form:
+ // <database name>.<table name> - parse it and
+ // communicate the information to HCatInputFormat
+
+ try {
+ return HCatUtil.getDbAndTableName(location);
+ } catch (IOException e) {
+ String locationErrMsg = "The input location in load statement " +
+ "should be of the form " +
+ "<databasename>.<table name> or <table name>. Got " + location;
+ throw new PigException(locationErrMsg, PIG_EXCEPTION_CODE);
+ }
+ }
+
+ static public String getHCatServerUri(Job job) {
+
+ return job.getConfiguration().get(HiveConf.ConfVars.METASTOREURIS.varname);
+ }
+
+ static public String getHCatServerPrincipal(Job job) {
+
+ return job.getConfiguration().get(HCatConstants.HCAT_METASTORE_PRINCIPAL);
+ }
+
+ private static HiveMetaStoreClient getHiveMetaClient(String serverUri,
+ String serverKerberosPrincipal, Class<?> clazz) throws Exception {
+ HiveConf hiveConf = new HiveConf(clazz);
+
+ if (serverUri != null) {
+ hiveConf.set("hive.metastore.local", "false");
+ hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, serverUri.trim());
+ }
+
+ if (serverKerberosPrincipal != null) {
+ hiveConf.setBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL, true);
+ hiveConf.setVar(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL, serverKerberosPrincipal);
+ }
+
+ try {
+ return HCatUtil.getHiveClient(hiveConf);
+ } catch (Exception e) {
+ throw new Exception("Could not instantiate a HiveMetaStoreClient connecting to server uri:[" + serverUri + "]", e);
+ }
+ }
+
+
+ HCatSchema getHCatSchema(List<RequiredField> fields, String signature, Class<?> classForUDFCLookup) throws IOException {
+ if (fields == null) {
+ return null;
+ }
+
+ Properties props = UDFContext.getUDFContext().getUDFProperties(
+ classForUDFCLookup, new String[]{signature});
+ HCatSchema hcatTableSchema = (HCatSchema) props.get(HCatConstants.HCAT_TABLE_SCHEMA);
+
+ ArrayList<HCatFieldSchema> fcols = new ArrayList<HCatFieldSchema>();
+ for (RequiredField rf : fields) {
+ fcols.add(hcatTableSchema.getFields().get(rf.getIndex()));
+ }
+ return new HCatSchema(fcols);
+ }
+
+ public Table getTable(String location, String hcatServerUri, String hcatServerPrincipal) throws IOException {
+ Pair<String, String> loc_server = new Pair<String, String>(location, hcatServerUri);
+ Table hcatTable = hcatTableCache.get(loc_server);
+ if (hcatTable != null) {
+ return hcatTable;
+ }
+
+ Pair<String, String> dbTablePair = PigHCatUtil.getDBTableNames(location);
+ String dbName = dbTablePair.first;
+ String tableName = dbTablePair.second;
+ Table table = null;
+ HiveMetaStoreClient client = null;
+ try {
+ client = getHiveMetaClient(hcatServerUri, hcatServerPrincipal, PigHCatUtil.class);
+ table = HCatUtil.getTable(client, dbName, tableName);
+ } catch (NoSuchObjectException nsoe) {
+ throw new PigException("Table not found : " + nsoe.getMessage(), PIG_EXCEPTION_CODE); // prettier error messages to frontend
+ } catch (Exception e) {
+ throw new IOException(e);
+ } finally {
+ HCatUtil.closeHiveClientQuietly(client);
+ }
+ hcatTableCache.put(loc_server, table);
+ return table;
+ }
+
+ public static ResourceSchema getResourceSchema(HCatSchema hcatSchema) throws IOException {
+
+ List<ResourceFieldSchema> rfSchemaList = new ArrayList<ResourceFieldSchema>();
+ for (HCatFieldSchema hfs : hcatSchema.getFields()) {
+ ResourceFieldSchema rfSchema;
+ rfSchema = getResourceSchemaFromFieldSchema(hfs);
+ rfSchemaList.add(rfSchema);
+ }
+ ResourceSchema rSchema = new ResourceSchema();
+ rSchema.setFields(rfSchemaList.toArray(new ResourceFieldSchema[0]));
+ return rSchema;
+
+ }
+
+ private static ResourceFieldSchema getResourceSchemaFromFieldSchema(HCatFieldSchema hfs)
+ throws IOException {
+ ResourceFieldSchema rfSchema;
+ // if we are dealing with a bag or tuple column - need to worry about subschema
+ if (hfs.getType() == Type.STRUCT) {
+ rfSchema = new ResourceFieldSchema()
+ .setName(hfs.getName())
+ .setDescription(hfs.getComment())
+ .setType(getPigType(hfs))
+ .setSchema(getTupleSubSchema(hfs));
+ } else if (hfs.getType() == Type.ARRAY) {
+ rfSchema = new ResourceFieldSchema()
+ .setName(hfs.getName())
+ .setDescription(hfs.getComment())
+ .setType(getPigType(hfs))
+ .setSchema(getBagSubSchema(hfs));
+ } else {
+ rfSchema = new ResourceFieldSchema()
+ .setName(hfs.getName())
+ .setDescription(hfs.getComment())
+ .setType(getPigType(hfs))
+ .setSchema(null); // no munging inner-schemas
+ }
+ return rfSchema;
+ }
+
+ protected static ResourceSchema getBagSubSchema(HCatFieldSchema hfs) throws IOException {
+ // there are two cases - array<Type> and array<struct<...>>
+ // in either case the element type of the array is represented in a
+ // tuple field schema in the bag's field schema - the second case (struct)
+ // more naturally translates to the tuple - in the first case (array<Type>)
+ // we simulate the tuple by putting the single field in a tuple
+
+ Properties props = UDFContext.getUDFContext().getClientSystemProps();
+ String innerTupleName = HCatConstants.HCAT_PIG_INNER_TUPLE_NAME_DEFAULT;
+ if (props != null && props.containsKey(HCatConstants.HCAT_PIG_INNER_TUPLE_NAME)) {
+ innerTupleName = props.getProperty(HCatConstants.HCAT_PIG_INNER_TUPLE_NAME)
+ .replaceAll("FIELDNAME", hfs.getName());
+ }
+ String innerFieldName = HCatConstants.HCAT_PIG_INNER_FIELD_NAME_DEFAULT;
+ if (props != null && props.containsKey(HCatConstants.HCAT_PIG_INNER_FIELD_NAME)) {
+ innerFieldName = props.getProperty(HCatConstants.HCAT_PIG_INNER_FIELD_NAME)
+ .replaceAll("FIELDNAME", hfs.getName());
+ }
+
+ ResourceFieldSchema[] bagSubFieldSchemas = new ResourceFieldSchema[1];
+ bagSubFieldSchemas[0] = new ResourceFieldSchema().setName(innerTupleName)
+ .setDescription("The tuple in the bag")
+ .setType(DataType.TUPLE);
+ HCatFieldSchema arrayElementFieldSchema = hfs.getArrayElementSchema().get(0);
+ if (arrayElementFieldSchema.getType() == Type.STRUCT) {
+ bagSubFieldSchemas[0].setSchema(getTupleSubSchema(arrayElementFieldSchema));
+ } else if (arrayElementFieldSchema.getType() == Type.ARRAY) {
+ ResourceSchema s = new ResourceSchema();
+ List<ResourceFieldSchema> lrfs = Arrays.asList(getResourceSchemaFromFieldSchema(arrayElementFieldSchema));
+ s.setFields(lrfs.toArray(new ResourceFieldSchema[0]));
+ bagSubFieldSchemas[0].setSchema(s);
+ } else {
+ ResourceFieldSchema[] innerTupleFieldSchemas = new ResourceFieldSchema[1];
+ innerTupleFieldSchemas[0] = new ResourceFieldSchema().setName(innerFieldName)
+ .setDescription("The inner field in the tuple in the bag")
+ .setType(getPigType(arrayElementFieldSchema))
+ .setSchema(null); // the element type is not a tuple - so no subschema
+ bagSubFieldSchemas[0].setSchema(new ResourceSchema().setFields(innerTupleFieldSchemas));
+ }
+ ResourceSchema s = new ResourceSchema().setFields(bagSubFieldSchemas);
+ return s;
+
+ }
+
+ private static ResourceSchema getTupleSubSchema(HCatFieldSchema hfs) throws IOException {
+ // for each struct subfield, create equivalent ResourceFieldSchema
+ ResourceSchema s = new ResourceSchema();
+ List<ResourceFieldSchema> lrfs = new ArrayList<ResourceFieldSchema>();
+ for (HCatFieldSchema subField : hfs.getStructSubSchema().getFields()) {
+ lrfs.add(getResourceSchemaFromFieldSchema(subField));
+ }
+ s.setFields(lrfs.toArray(new ResourceFieldSchema[0]));
+ return s;
+ }
+
+ /**
+ * @param hfs the field schema of the column
+ * @return corresponding pig type
+ * @throws IOException
+ */
+ static public byte getPigType(HCatFieldSchema hfs) throws IOException {
+ return getPigType(hfs.getType());
+ }
+
+ static public byte getPigType(Type type) throws IOException {
+ if (type == Type.STRING) {
+ return DataType.CHARARRAY;
+ }
+
+ if ((type == Type.INT) || (type == Type.SMALLINT) || (type == Type.TINYINT)) {
+ return DataType.INTEGER;
+ }
+
+ if (type == Type.ARRAY) {
+ return DataType.BAG;
+ }
+
+ if (type == Type.STRUCT) {
+ return DataType.TUPLE;
+ }
+
+ if (type == Type.MAP) {
+ return DataType.MAP;
+ }
+
+ if (type == Type.BIGINT) {
+ return DataType.LONG;
+ }
+
+ if (type == Type.FLOAT) {
+ return DataType.FLOAT;
+ }
+
+ if (type == Type.DOUBLE) {
+ return DataType.DOUBLE;
+ }
+
+ if (type == Type.BINARY) {
+ return DataType.BYTEARRAY;
+ }
+
+ if (type == Type.BOOLEAN && pigHasBooleanSupport) {
+ return DataType.BOOLEAN;
+ }
+
+ throw new PigException("HCatalog column type '" + type.toString()
+ + "' is not supported in Pig as a column type", PIG_EXCEPTION_CODE);
+ }
+
+ public static Tuple transformToTuple(HCatRecord hr, HCatSchema hs) throws Exception {
+ if (hr == null) {
+ return null;
+ }
+ return transformToTuple(hr.getAll(), hs);
+ }
+
+ @SuppressWarnings("unchecked")
+ public static Object extractPigObject(Object o, HCatFieldSchema hfs) throws Exception {
+ Object result;
+ Type itemType = hfs.getType();
+ switch (itemType) {
+ case BINARY:
+ result = (o == null) ? null : new DataByteArray((byte[]) o);
+ break;
+ case STRUCT:
+ result = transformToTuple((List<Object>) o, hfs);
+ break;
+ case ARRAY:
+ result = transformToBag((List<? extends Object>) o, hfs);
+ break;
+ case MAP:
+ result = transformToPigMap((Map<Object, Object>) o, hfs);
+ break;
+ default:
+ result = o;
+ break;
+ }
+ return result;
+ }
+
+ private static Tuple transformToTuple(List<? extends Object> objList, HCatFieldSchema hfs) throws Exception {
+ try {
+ return transformToTuple(objList, hfs.getStructSubSchema());
+ } catch (Exception e) {
+ if (hfs.getType() != Type.STRUCT) {
+ throw new Exception("Expected Struct type, got " + hfs.getType(), e);
+ } else {
+ throw e;
+ }
+ }
+ }
+
+ private static Tuple transformToTuple(List<? extends Object> objList, HCatSchema hs) throws Exception {
+ if (objList == null) {
+ return null;
+ }
+ Tuple t = tupFac.newTuple(objList.size());
+ List<HCatFieldSchema> subFields = hs.getFields();
+ for (int i = 0; i < subFields.size(); i++) {
+ t.set(i, extractPigObject(objList.get(i), subFields.get(i)));
+ }
+ return t;
+ }
+
+ private static Map<String, Object> transformToPigMap(Map<Object, Object> map, HCatFieldSchema hfs) throws Exception {
+ if (map == null) {
+ return null;
+ }
+
+ Map<String, Object> result = new HashMap<String, Object>();
+ for (Entry<Object, Object> entry : map.entrySet()) {
+ // since map key for Pig has to be Strings
+ result.put(entry.getKey().toString(), extractPigObject(entry.getValue(), hfs.getMapValueSchema().get(0)));
+ }
+ return result;
+ }
+
+ @SuppressWarnings("unchecked")
+ private static DataBag transformToBag(List<? extends Object> list, HCatFieldSchema hfs) throws Exception {
+ if (list == null) {
+ return null;
+ }
+
+ HCatFieldSchema elementSubFieldSchema = hfs.getArrayElementSchema().getFields().get(0);
+ DataBag db = new DefaultDataBag();
+ for (Object o : list) {
+ Tuple tuple;
+ if (elementSubFieldSchema.getType() == Type.STRUCT) {
+ tuple = transformToTuple((List<Object>) o, elementSubFieldSchema);
+ } else {
+ // bags always contain tuples
+ tuple = tupFac.newTuple(extractPigObject(o, elementSubFieldSchema));
+ }
+ db.add(tuple);
+ }
+ return db;
+ }
+
+
+ private static void validateHCatSchemaFollowsPigRules(HCatSchema tblSchema) throws PigException {
+ for (HCatFieldSchema hcatField : tblSchema.getFields()) {
+ validateHcatFieldFollowsPigRules(hcatField);
+ }
+ }
+
+ private static void validateHcatFieldFollowsPigRules(HCatFieldSchema hcatField) throws PigException {
+ try {
+ Type hType = hcatField.getType();
+ switch (hType) {
+ case BOOLEAN:
+ if (!pigHasBooleanSupport) {
+ throw new PigException("Incompatible type found in HCat table schema: "
+ + hcatField, PigHCatUtil.PIG_EXCEPTION_CODE);
+ }
+ break;
+ case ARRAY:
+ validateHCatSchemaFollowsPigRules(hcatField.getArrayElementSchema());
+ break;
+ case STRUCT:
+ validateHCatSchemaFollowsPigRules(hcatField.getStructSubSchema());
+ break;
+ case MAP:
+ // key is only string
+ if (hcatField.getMapKeyType() != Type.STRING) {
+ LOG.info("Converting non-String key of map " + hcatField.getName() + " from "
+ + hcatField.getMapKeyType() + " to String.");
+ }
+ validateHCatSchemaFollowsPigRules(hcatField.getMapValueSchema());
+ break;
+ }
+ } catch (HCatException e) {
+ throw new PigException("Incompatible type found in hcat table schema: " + hcatField, PigHCatUtil.PIG_EXCEPTION_CODE, e);
+ }
+ }
+
+
+ public static void validateHCatTableSchemaFollowsPigRules(HCatSchema hcatTableSchema) throws IOException {
+ validateHCatSchemaFollowsPigRules(hcatTableSchema);
+ }
+
+ public static void getConfigFromUDFProperties(Properties p, Configuration config, String propName) {
+ if (p.getProperty(propName) != null) {
+ config.set(propName, p.getProperty(propName));
+ }
+ }
+
+ public static void saveConfigIntoUDFProperties(Properties p, Configuration config, String propName) {
+ if (config.get(propName) != null) {
+ p.setProperty(propName, config.get(propName));
+ }
+ }
+
+}
Added: hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/HCatStorerWrapper.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/HCatStorerWrapper.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/HCatStorerWrapper.java (added)
+++ hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/HCatStorerWrapper.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.pig;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.pig.impl.util.UDFContext;
+
+/**
+ * This class is used to test the HCAT_PIG_STORER_EXTERNAL_LOCATION property used in HCatStorer.
+ * When this property is set, HCatStorer writes the output to the location it specifies. Since
+ * the property can only be set in the UDFContext, we need this simpler wrapper to do three things:
+ * <ol>
+ * <li> save the external dir specified in the Pig script </li>
+ * <li> set the same UDFContext signature as HCatStorer </li>
+ * <li> before {@link HCatStorer#setStoreLocation(String, Job)}, set the external dir in the UDFContext.</li>
+ * </ol>
+ */
+public class HCatStorerWrapper extends HCatStorer {
+
+ private String sign;
+ private String externalDir;
+
+ public HCatStorerWrapper(String partSpecs, String schema, String externalDir) throws Exception {
+ super(partSpecs, schema);
+ this.externalDir = externalDir;
+ }
+
+ public HCatStorerWrapper(String partSpecs, String externalDir) throws Exception {
+ super(partSpecs);
+ this.externalDir = externalDir;
+ }
+
+ public HCatStorerWrapper(String externalDir) throws Exception{
+ super();
+ this.externalDir = externalDir;
+ }
+
+ @Override
+ public void setStoreLocation(String location, Job job) throws IOException {
+ Properties udfProps = UDFContext.getUDFContext().getUDFProperties(
+ this.getClass(), new String[] { sign });
+ udfProps.setProperty(HCatConstants.HCAT_PIG_STORER_EXTERNAL_LOCATION, externalDir);
+ super.setStoreLocation(location, job);
+ }
+
+ @Override
+ public void setStoreFuncUDFContextSignature(String signature) {
+ sign = signature;
+ super.setStoreFuncUDFContextSignature(signature);
+ }
+}