You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by ha...@apache.org on 2011/11/01 21:14:53 UTC
svn commit: r1196278 - in /incubator/hcatalog/trunk: ./
src/java/org/apache/hcatalog/cli/SemanticAnalysis/
src/java/org/apache/hcatalog/common/
src/java/org/apache/hcatalog/mapreduce/ src/java/org/apache/hcatalog/pig/
src/java/org/apache/hcatalog/pig/d...
Author: hashutosh
Date: Tue Nov 1 21:14:50 2011
New Revision: 1196278
URL: http://svn.apache.org/viewvc?rev=1196278&view=rev
Log:
HCATALOG-121 : TextStorageOutputDriver for Pig
Added:
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/drivers/StoreFuncBasedOutputDriver.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/drivers/StoreFuncBasedOutputFormat.java
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/MyPigStorage.java
Modified:
incubator/hcatalog/trunk/CHANGES.txt
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/cli/SemanticAnalysis/AlterTableFileFormatHook.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/cli/SemanticAnalysis/CreateTableHook.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/ErrorType.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputStorageDriver.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatLoader.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/drivers/LoadFuncBasedInputDriver.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/drivers/LoadFuncBasedInputFormat.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/rcfile/RCFileOutputDriver.java
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/cli/TestSemanticAnalysis.java
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/MyPigStorageDriver.java
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestPigStorageDriver.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputStorageDriver.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputStorageDriver.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseOutputStorageDriver.java
Modified: incubator/hcatalog/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1196278&r1=1196277&r2=1196278&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Tue Nov 1 21:14:50 2011
@@ -23,6 +23,8 @@ Trunk (unreleased changes)
INCOMPATIBLE CHANGES
NEW FEATURES
+ HCAT-121. TextStorageOutputDriver for Pig (daijyc via hashutosh)
+
HCAT-129. HBase Storage Driver Test doesn't use unique test dir for warehouse (toffer via khorgath)
HCAT-109. HBase Storage Handler for HCatalog (avandana via khorgath)
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/cli/SemanticAnalysis/AlterTableFileFormatHook.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/cli/SemanticAnalysis/AlterTableFileFormatHook.java?rev=1196278&r1=1196277&r2=1196278&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/cli/SemanticAnalysis/AlterTableFileFormatHook.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/cli/SemanticAnalysis/AlterTableFileFormatHook.java Tue Nov 1 21:14:50 2011
@@ -40,10 +40,11 @@ import org.apache.hadoop.hive.ql.plan.DD
import org.apache.hcatalog.common.HCatConstants;
import org.apache.hcatalog.rcfile.RCFileInputDriver;
import org.apache.hcatalog.rcfile.RCFileOutputDriver;
+import org.apache.pig.builtin.PigStorage;
public class AlterTableFileFormatHook extends AbstractSemanticAnalyzerHook {
- private String inDriver, outDriver, tableName;
+ private String inDriver, outDriver, tableName, loader, storer;
@Override
public ASTNode preAnalyze(HiveSemanticAnalyzerHookContext context, ASTNode ast) throws SemanticException {
@@ -65,8 +66,13 @@ public class AlterTableFileFormatHook ex
"You may specify it through INPUT/OUTPUT storage drivers.");
case HiveParser.TOK_TBLTEXTFILE:
- throw new SemanticException("Operation not supported. HCatalog doesn't support Text File by default yet. " +
- "You may specify it through INPUT/OUTPUT storage drivers.");
+ inputFormat = org.apache.hadoop.mapred.TextInputFormat.class.getName();
+ outputFormat = org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat.class.getName();
+ inDriver = org.apache.hcatalog.pig.drivers.LoadFuncBasedInputDriver.class.getName();
+ outDriver = org.apache.hcatalog.pig.drivers.StoreFuncBasedOutputDriver.class.getName();
+ loader = PigStorage.class.getName();
+ storer = PigStorage.class.getName();
+ break;
case HiveParser.TOK_TBLRCFILE:
inputFormat = RCFileInputFormat.class.getName();
@@ -91,6 +97,14 @@ public class AlterTableFileFormatHook ex
hcatProps.put(HCatConstants.HCAT_ISD_CLASS, inDriver);
hcatProps.put(HCatConstants.HCAT_OSD_CLASS, outDriver);
+ if (loader!=null) {
+ hcatProps.put(HCatConstants.HCAT_PIG_LOADER, loader);
+ }
+
+ if (storer!=null) {
+ hcatProps.put(HCatConstants.HCAT_PIG_STORER, storer);
+ }
+
try {
Hive db = context.getHive();
Table tbl = db.getTable(tableName);
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/cli/SemanticAnalysis/CreateTableHook.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/cli/SemanticAnalysis/CreateTableHook.java?rev=1196278&r1=1196277&r2=1196278&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/cli/SemanticAnalysis/CreateTableHook.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/cli/SemanticAnalysis/CreateTableHook.java Tue Nov 1 21:14:50 2011
@@ -47,13 +47,16 @@ import org.apache.hcatalog.common.AuthUt
import org.apache.hcatalog.common.HCatConstants;
import org.apache.hcatalog.common.HCatException;
import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.pig.drivers.LoadFuncBasedInputDriver;
+import org.apache.hcatalog.pig.drivers.StoreFuncBasedOutputDriver;
import org.apache.hcatalog.rcfile.RCFileInputDriver;
import org.apache.hcatalog.rcfile.RCFileOutputDriver;
import org.apache.hcatalog.storagehandler.HCatStorageHandler;
+import org.apache.pig.builtin.PigStorage;
final class CreateTableHook extends AbstractSemanticAnalyzerHook {
- private String inStorageDriver, outStorageDriver, tableName;
+ private String inStorageDriver, outStorageDriver, tableName, loader, storer;
@Override
public ASTNode preAnalyze(HiveSemanticAnalyzerHookContext context,
@@ -97,10 +100,14 @@ final class CreateTableHook extends Abst
+ "You may specify it through INPUT/OUTPUT storage drivers.");
case HiveParser.TOK_TBLTEXTFILE:
- throw new SemanticException(
- "Operation not supported. HCatalog doesn't support " +
- "Text File by default yet. "
- + "You may specify it through INPUT/OUTPUT storage drivers.");
+ inputFormat = org.apache.hadoop.mapred.TextInputFormat.class.getName();
+ outputFormat = org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat.class.getName();
+ inStorageDriver = org.apache.hcatalog.pig.drivers.LoadFuncBasedInputDriver.class.getName();
+ outStorageDriver = org.apache.hcatalog.pig.drivers.StoreFuncBasedOutputDriver.class.getName();
+ loader = PigStorage.class.getName();
+ storer = PigStorage.class.getName();
+
+ break;
case HiveParser.TOK_LIKETABLE:
@@ -255,6 +262,14 @@ final class CreateTableHook extends Abst
}
}
+
+ if (loader!=null) {
+ tblProps.put(HCatConstants.HCAT_PIG_LOADER, loader);
+ }
+
+ if (storer!=null) {
+ tblProps.put(HCatConstants.HCAT_PIG_STORER, storer);
+ }
if (desc == null) {
// Desc will be null if its CREATE TABLE LIKE. Desc will be
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/ErrorType.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/ErrorType.java?rev=1196278&r1=1196277&r2=1196278&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/ErrorType.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/ErrorType.java Tue Nov 1 21:14:50 2011
@@ -42,6 +42,8 @@ public enum ErrorType {
ERROR_MISSING_PARTITION_KEY (2011, "Partition key value not provided for publish"),
ERROR_MOVE_FAILED (2012, "Moving of data failed during commit"),
ERROR_TOO_MANY_DYNAMIC_PTNS (2013, "Attempt to create too many dynamic partitions"),
+ ERROR_INIT_LOADER (2014, "Error initializing Pig loader"),
+ ERROR_INIT_STORER (2015, "Error initializing Pig storer"),
/* Authorization Errors 3000 - 3999 */
ERROR_ACCESS_CONTROL (3000, "Permission denied"),
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java?rev=1196278&r1=1196277&r2=1196278&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java Tue Nov 1 21:14:50 2011
@@ -30,6 +30,13 @@ public final class HCatConstants {
public static final String HCAT_RCFILE_ISD_CLASS = "org.apache.hcatalog.rcfile.RCFileInputDriver";
public static final String HCAT_RCFILE_OSD_CLASS = "org.apache.hcatalog.rcfile.RCFileOutputDriver";
+ public static final String HCAT_PIG_LOADER = "hcat.pig.loader";
+ public static final String HCAT_PIG_LOADER_ARGS = "hcat.pig.loader.args";
+ public static final String HCAT_PIG_STORER = "hcat.pig.storer";
+ public static final String HCAT_PIG_STORER_ARGS = "hcat.pig.storer.args";
+ public static final String HCAT_PIG_ARGS_DELIMIT = "hcat.pig.args.delimiter";
+ public static final String HCAT_PIG_ARGS_DELIMIT_DEFAULT = ",";
+
//The keys used to store info into the job Configuration
public static final String HCAT_KEY_BASE = "mapreduce.lib.hcat";
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java?rev=1196278&r1=1196277&r2=1196278&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java Tue Nov 1 21:14:50 2011
@@ -157,10 +157,9 @@ public abstract class HCatBaseOutputForm
driver.setPartitionValues(jobContext, partitionValues);
driver.setOutputPath(jobContext, location);
-// HCatUtil.logMap(LOG,"Setting outputPath ["+location+"] for ",partitionValues);
-
driver.initialize(jobContext, jobInfo.getTableInfo().getStorerInfo().getProperties());
-
+
+// HCatUtil.logMap(LOG,"Setting outputPath ["+location+"] for ",partitionValues);
return driver;
} catch(Exception e) {
if (e instanceof HCatException){
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputStorageDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputStorageDriver.java?rev=1196278&r1=1196277&r2=1196278&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputStorageDriver.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputStorageDriver.java Tue Nov 1 21:14:50 2011
@@ -52,7 +52,7 @@ public abstract class HCatOutputStorageD
* @return the OutputFormat instance
* @throws IOException Signals that an I/O exception has occurred.
*/
- public abstract OutputFormat<? super WritableComparable<?>, ? super Writable> getOutputFormat() throws IOException;
+ public abstract OutputFormat<? extends WritableComparable<?>, ? extends Writable> getOutputFormat() throws IOException;
/**
* Set the data location for the output.
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatLoader.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatLoader.java?rev=1196278&r1=1196277&r2=1196278&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatLoader.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatLoader.java Tue Nov 1 21:14:50 2011
@@ -53,6 +53,10 @@ public class HCatLoader extends HCatBase
private String hcatServerUri;
private String partitionFilterString;
private final PigHCatUtil phutil = new PigHCatUtil();
+
+ // Signature for wrapped loader, see comments in LoadFuncBasedInputDriver.initialize
+ final public static String INNER_SIGNATURE = "hcatloader.inner.signature";
+ final public static String INNER_SIGNATURE_PREFIX = "hcatloader_inner_signature";
@Override
public InputFormat<?,?> getInputFormat() throws IOException {
@@ -70,6 +74,7 @@ public class HCatLoader extends HCatBase
@Override
public void setLocation(String location, Job job) throws IOException {
+ job.getConfiguration().set(INNER_SIGNATURE, INNER_SIGNATURE_PREFIX + "_" + signature);
Pair<String, String> dbTablePair = PigHCatUtil.getDBTableNames(location);
dbName = dbTablePair.first;
tableName = dbTablePair.second;
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java?rev=1196278&r1=1196277&r2=1196278&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java Tue Nov 1 21:14:50 2011
@@ -48,9 +48,10 @@ import org.apache.pig.impl.util.UDFConte
public class HCatStorer extends HCatBaseStorer {
- /**
- *
- */
+ // Signature for wrapped storer, see comments in LoadFuncBasedInputDriver.initialize
+ final public static String INNER_SIGNATURE = "hcatstorer.inner.signature";
+ final public static String INNER_SIGNATURE_PREFIX = "hcatstorer_inner_signature";
+
public HCatStorer(String partSpecs, String schema) throws Exception {
super(partSpecs, schema);
@@ -72,6 +73,7 @@ public class HCatStorer extends HCatBase
@Override
public void setStoreLocation(String location, Job job) throws IOException {
+ job.getConfiguration().set(INNER_SIGNATURE, INNER_SIGNATURE_PREFIX + "_" + sign);
Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[]{sign});
String[] userStr = location.split("\\.");
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/drivers/LoadFuncBasedInputDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/drivers/LoadFuncBasedInputDriver.java?rev=1196278&r1=1196277&r2=1196278&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/drivers/LoadFuncBasedInputDriver.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/drivers/LoadFuncBasedInputDriver.java Tue Nov 1 21:14:50 2011
@@ -18,6 +18,8 @@
package org.apache.hcatalog.pig.drivers;
import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -28,12 +30,17 @@ import org.apache.hadoop.io.WritableComp
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hcatalog.common.ErrorType;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatException;
import org.apache.hcatalog.data.DefaultHCatRecord;
import org.apache.hcatalog.data.HCatRecord;
import org.apache.hcatalog.data.schema.HCatSchema;
import org.apache.hcatalog.mapreduce.HCatInputStorageDriver;
+import org.apache.hcatalog.pig.HCatLoader;
import org.apache.hcatalog.pig.PigHCatUtil;
import org.apache.pig.LoadFunc;
+import org.apache.pig.builtin.PigStorage;
import org.apache.pig.data.Tuple;
@@ -44,7 +51,7 @@ import org.apache.pig.data.Tuple;
* and override the initialize(). {@link PigStorageInputDriver} illustrates
* that well.
*/
-public abstract class LoadFuncBasedInputDriver extends HCatInputStorageDriver{
+public class LoadFuncBasedInputDriver extends HCatInputStorageDriver{
private LoadFuncBasedInputFormat inputFormat;
private HCatSchema dataSchema;
@@ -99,9 +106,51 @@ public abstract class LoadFuncBasedInput
@Override
public void initialize(JobContext context, Properties storageDriverArgs) throws IOException {
-
+
+ String loaderString = storageDriverArgs.getProperty(HCatConstants.HCAT_PIG_LOADER);
+ if (loaderString==null) {
+ throw new HCatException(ErrorType.ERROR_INIT_LOADER, "Don't know how to instantiate loader, " + HCatConstants.HCAT_PIG_LOADER + " property is not defined for table ");
+ }
+ String loaderArgs = storageDriverArgs.getProperty(HCatConstants.HCAT_PIG_LOADER_ARGS);
+
+ String[] args;
+ if (loaderArgs!=null) {
+ String delimit = storageDriverArgs.getProperty(HCatConstants.HCAT_PIG_ARGS_DELIMIT);
+ if (delimit==null) {
+ delimit = HCatConstants.HCAT_PIG_ARGS_DELIMIT_DEFAULT;
+ }
+ args = loaderArgs.split(delimit);
+ } else {
+ args = new String[0];
+ }
+
+ try {
+ Class loaderClass = Class.forName(loaderString);
+
+ Constructor[] constructors = loaderClass.getConstructors();
+ for (Constructor constructor : constructors) {
+ if (constructor.getParameterTypes().length==args.length) {
+ lf = (LoadFunc)constructor.newInstance(args);
+ break;
+ }
+ }
+ } catch (Exception e) {
+ throw new HCatException(ErrorType.ERROR_INIT_LOADER, "Cannot instantiate " + loaderString, e);
+ }
+
+ if (lf==null) {
+ throw new HCatException(ErrorType.ERROR_INIT_LOADER, "Cannot instantiate " + loaderString + " with construct args " + loaderArgs);
+ }
+
+ // Need to set the right signature in setLocation. The original signature is used by HCatLoader
+ // and it does use this signature to access UDFContext, so we need to invent a new signature for
+ // the wrapped loader.
+ // As for PigStorage/JsonStorage, set signature right before setLocation seems to be good enough,
+ // we may need to set signature more aggressively if we support more loaders
+ String innerSignature = context.getConfiguration().get(HCatLoader.INNER_SIGNATURE);
+ lf.setUDFContextSignature(innerSignature);
lf.setLocation(location, new Job(context.getConfiguration()));
- inputFormat = new LoadFuncBasedInputFormat(lf, PigHCatUtil.getResourceSchema(dataSchema));
+ inputFormat = new LoadFuncBasedInputFormat(lf, PigHCatUtil.getResourceSchema(dataSchema), location, context.getConfiguration());
}
private String location;
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/drivers/LoadFuncBasedInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/drivers/LoadFuncBasedInputFormat.java?rev=1196278&r1=1196277&r2=1196278&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/drivers/LoadFuncBasedInputFormat.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/drivers/LoadFuncBasedInputFormat.java Tue Nov 1 21:14:50 2011
@@ -20,14 +20,17 @@ package org.apache.hcatalog.pig.drivers;
import java.io.IOException;
import java.util.List;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.pig.LoadCaster;
import org.apache.pig.LoadFunc;
+import org.apache.pig.LoadMetadata;
import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.builtin.PigStorage;
@@ -42,10 +45,15 @@ public class LoadFuncBasedInputFormat ex
private final LoadFunc loadFunc;
private static ResourceFieldSchema[] fields;
- public LoadFuncBasedInputFormat(LoadFunc loadFunc, ResourceSchema dataSchema) {
+ public LoadFuncBasedInputFormat(LoadFunc loadFunc, ResourceSchema dataSchema, String location, Configuration conf) throws IOException {
this.loadFunc = loadFunc;
fields = dataSchema.getFields();
+
+ // Simulate the frontend call sequence for LoadFunc, in case LoadFunc need to store something into UDFContext (as JsonLoader does)
+ if (loadFunc instanceof LoadMetadata) {
+ ((LoadMetadata)loadFunc).getSchema(location, new Job(conf));
+ }
}
@Override
@@ -59,7 +67,6 @@ public class LoadFuncBasedInputFormat ex
@Override
public List<InputSplit> getSplits(JobContext jobContext) throws IOException,
InterruptedException {
-
try {
InputFormat<BytesWritable,Tuple> inpFormat = loadFunc.getInputFormat();
return inpFormat.getSplits(jobContext);
@@ -103,51 +110,57 @@ public class LoadFuncBasedInputFormat ex
for(int i = 0; i < tupleFromDisk.size(); i++) {
- DataByteArray dba = (DataByteArray) tupleFromDisk.get(i);
-
- if(dba == null) {
- // PigStorage will insert nulls for empty fields.
- tupleFromDisk.set(i, null);
- continue;
- }
-
- switch(fields[i].getType()) {
-
- case DataType.CHARARRAY:
- tupleFromDisk.set(i, caster.bytesToCharArray(dba.get()));
- break;
-
- case DataType.INTEGER:
- tupleFromDisk.set(i, caster.bytesToInteger(dba.get()));
- break;
-
- case DataType.FLOAT:
- tupleFromDisk.set(i, caster.bytesToFloat(dba.get()));
- break;
-
- case DataType.LONG:
- tupleFromDisk.set(i, caster.bytesToLong(dba.get()));
- break;
-
- case DataType.DOUBLE:
- tupleFromDisk.set(i, caster.bytesToDouble(dba.get()));
- break;
-
- case DataType.MAP:
- tupleFromDisk.set(i, caster.bytesToMap(dba.get()));
- break;
-
- case DataType.BAG:
- tupleFromDisk.set(i, caster.bytesToBag(dba.get(), fields[i]));
- break;
-
- case DataType.TUPLE:
- tupleFromDisk.set(i, caster.bytesToTuple(dba.get(), fields[i]));
- break;
-
- default:
- throw new IOException("Unknown Pig type in data: "+fields[i].getType());
- }
+ Object data = tupleFromDisk.get(i);
+
+ // We will do conversion for bytes only for now
+ if (data instanceof DataByteArray) {
+
+ DataByteArray dba = (DataByteArray) data;
+
+ if(dba == null) {
+ // PigStorage will insert nulls for empty fields.
+ tupleFromDisk.set(i, null);
+ continue;
+ }
+
+ switch(fields[i].getType()) {
+
+ case DataType.CHARARRAY:
+ tupleFromDisk.set(i, caster.bytesToCharArray(dba.get()));
+ break;
+
+ case DataType.INTEGER:
+ tupleFromDisk.set(i, caster.bytesToInteger(dba.get()));
+ break;
+
+ case DataType.FLOAT:
+ tupleFromDisk.set(i, caster.bytesToFloat(dba.get()));
+ break;
+
+ case DataType.LONG:
+ tupleFromDisk.set(i, caster.bytesToLong(dba.get()));
+ break;
+
+ case DataType.DOUBLE:
+ tupleFromDisk.set(i, caster.bytesToDouble(dba.get()));
+ break;
+
+ case DataType.MAP:
+ tupleFromDisk.set(i, caster.bytesToMap(dba.get()));
+ break;
+
+ case DataType.BAG:
+ tupleFromDisk.set(i, caster.bytesToBag(dba.get(), fields[i]));
+ break;
+
+ case DataType.TUPLE:
+ tupleFromDisk.set(i, caster.bytesToTuple(dba.get(), fields[i]));
+ break;
+
+ default:
+ throw new IOException("Unknown Pig type in data: "+fields[i].getType());
+ }
+ }
}
return tupleFromDisk;
Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/drivers/StoreFuncBasedOutputDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/drivers/StoreFuncBasedOutputDriver.java?rev=1196278&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/drivers/StoreFuncBasedOutputDriver.java (added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/drivers/StoreFuncBasedOutputDriver.java Tue Nov 1 21:14:50 2011
@@ -0,0 +1,130 @@
+package org.apache.hcatalog.pig.drivers;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hcatalog.common.ErrorType;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.mapreduce.FileOutputStorageDriver;
+import org.apache.hcatalog.mapreduce.HCatOutputStorageDriver;
+import org.apache.hcatalog.mapreduce.OutputJobInfo;
+import org.apache.hcatalog.pig.HCatLoader;
+import org.apache.hcatalog.pig.HCatStorer;
+import org.apache.hcatalog.pig.PigHCatUtil;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.StoreFunc;
+import org.apache.pig.StoreFuncInterface;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.data.DefaultTupleFactory;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+
+public class StoreFuncBasedOutputDriver extends FileOutputStorageDriver {
+
+ protected StoreFuncInterface sf;
+ private TupleFactory factory = TupleFactory.getInstance();
+ private HCatSchema schema;
+ private String location;
+
+ @Override
+ public void initialize(JobContext jobContext, Properties hcatProperties) throws IOException {
+ String storerString = hcatProperties.getProperty(HCatConstants.HCAT_PIG_STORER);
+ if (storerString==null) {
+ throw new HCatException(ErrorType.ERROR_INIT_STORER, "Don't know how to instantiate storer, " + HCatConstants.HCAT_PIG_STORER + " property is not defined for table ");
+ }
+ String storerArgs = hcatProperties.getProperty(HCatConstants.HCAT_PIG_STORER_ARGS);
+
+ String[] args;
+ if (storerArgs!=null) {
+ String delimit = hcatProperties.getProperty(HCatConstants.HCAT_PIG_ARGS_DELIMIT);
+ if (delimit==null) {
+ delimit = HCatConstants.HCAT_PIG_ARGS_DELIMIT_DEFAULT;
+ }
+ args = storerArgs.split(delimit);
+ } else {
+ args = new String[0];
+ }
+
+ try {
+ Class storerClass = Class.forName(storerString);
+
+ Constructor[] constructors = storerClass.getConstructors();
+ for (Constructor constructor : constructors) {
+ if (constructor.getParameterTypes().length==args.length) {
+ sf = (StoreFuncInterface)constructor.newInstance(args);
+ break;
+ }
+ }
+ } catch (Exception e) {
+ throw new HCatException(ErrorType.ERROR_INIT_STORER, "Cannot instantiate " + storerString, e);
+ }
+
+ if (sf==null) {
+ throw new HCatException(ErrorType.ERROR_INIT_STORER, "Cannot instantiate " + storerString + " with construct args " + storerArgs);
+ }
+
+ super.initialize(jobContext, hcatProperties);
+
+ Job job = new Job(jobContext.getConfiguration());
+ String innerSignature = jobContext.getConfiguration().get(HCatStorer.INNER_SIGNATURE);
+
+ // Set signature before invoking StoreFunc methods, see comment in
+ // see comments in LoadFuncBasedInputDriver.initialize
+ sf.setStoreFuncUDFContextSignature(innerSignature);
+ sf.checkSchema(PigHCatUtil.getResourceSchema(schema));
+
+ sf.setStoreLocation(location, job);
+ ConfigurationUtil.mergeConf(jobContext.getConfiguration(),
+ job.getConfiguration());
+ }
+
+ @Override
+ public OutputFormat<? extends WritableComparable<?>, ? extends Writable> getOutputFormat()
+ throws IOException {
+ StoreFuncBasedOutputFormat outputFormat = new StoreFuncBasedOutputFormat(sf);
+ return outputFormat;
+ }
+
+ @Override
+ public void setOutputPath(JobContext jobContext, String location)
+ throws IOException {
+ this.location = location;
+ }
+
+ @Override
+ public void setSchema(JobContext jobContext, HCatSchema schema)
+ throws IOException {
+ this.schema = schema;
+ }
+
+ @Override
+ public void setPartitionValues(JobContext jobContext,
+ Map<String, String> partitionValues) throws IOException {
+ // Doing nothing, partition keys are not stored along with the data, so ignore it
+ }
+
+ @Override
+ public WritableComparable<?> generateKey(HCatRecord value)
+ throws IOException {
+ return null;
+ }
+
+ @Override
+ public Writable convertValue(HCatRecord value) throws IOException {
+ Tuple t = factory.newTupleNoCopy(value.getAll());
+ return t;
+ }
+
+}
Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/drivers/StoreFuncBasedOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/drivers/StoreFuncBasedOutputFormat.java?rev=1196278&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/drivers/StoreFuncBasedOutputFormat.java (added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/drivers/StoreFuncBasedOutputFormat.java Tue Nov 1 21:14:50 2011
@@ -0,0 +1,153 @@
+package org.apache.hcatalog.pig.drivers;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.mapreduce.OutputJobInfo;
+import org.apache.hcatalog.pig.PigHCatUtil;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.StoreFunc;
+import org.apache.pig.StoreFuncInterface;
+import org.apache.pig.StoreMetadata;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+public class StoreFuncBasedOutputFormat extends
+ OutputFormat<BytesWritable, Tuple> {
+
+ private final StoreFuncInterface storeFunc;
+
+ public StoreFuncBasedOutputFormat(StoreFuncInterface storeFunc) {
+
+ this.storeFunc = storeFunc;
+ }
+
+ @Override
+ public void checkOutputSpecs(JobContext jobContext) throws IOException,
+ InterruptedException {
+ OutputFormat<BytesWritable,Tuple> outputFormat = storeFunc.getOutputFormat();
+ outputFormat.checkOutputSpecs(jobContext);
+ }
+
+ @Override
+ public OutputCommitter getOutputCommitter(TaskAttemptContext ctx)
+ throws IOException, InterruptedException {
+ String serializedJobInfo = ctx.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO);
+ OutputJobInfo outputJobInfo = (OutputJobInfo)HCatUtil.deserialize(serializedJobInfo);
+ ResourceSchema rs = PigHCatUtil.getResourceSchema(outputJobInfo.getOutputSchema());
+ String location = outputJobInfo.getLocation();
+ OutputFormat<BytesWritable,Tuple> outputFormat = storeFunc.getOutputFormat();
+ return new StoreFuncBasedOutputCommitter(storeFunc, outputFormat.getOutputCommitter(ctx), location, rs);
+ }
+
+ @Override
+ public RecordWriter<BytesWritable, Tuple> getRecordWriter(
+ TaskAttemptContext ctx) throws IOException, InterruptedException {
+ RecordWriter<BytesWritable,Tuple> writer = storeFunc.getOutputFormat().getRecordWriter(ctx);
+ String serializedJobInfo = ctx.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO);
+ OutputJobInfo outputJobInfo = (OutputJobInfo)HCatUtil.deserialize(serializedJobInfo);
+ ResourceSchema rs = PigHCatUtil.getResourceSchema(outputJobInfo.getOutputSchema());
+ String location = outputJobInfo.getLocation();
+ return new StoreFuncBasedRecordWriter(writer, storeFunc, location, rs);
+ }
+
+ static class StoreFuncBasedRecordWriter extends RecordWriter<BytesWritable, Tuple> {
+ private final RecordWriter<BytesWritable,Tuple> writer;
+ private final StoreFuncInterface storeFunc;
+ private final ResourceSchema schema;
+ private final String location;
+
+ public StoreFuncBasedRecordWriter(RecordWriter<BytesWritable,Tuple> writer, StoreFuncInterface sf, String location, ResourceSchema rs) throws IOException {
+ this.writer = writer;
+ this.storeFunc = sf;
+ this.schema = rs;
+ this.location = location;
+ storeFunc.prepareToWrite(writer);
+ }
+
+ @Override
+ public void close(TaskAttemptContext ctx) throws IOException,
+ InterruptedException {
+ writer.close(ctx);
+ }
+
+ @Override
+ public void write(BytesWritable key, Tuple value) throws IOException,
+ InterruptedException {
+ storeFunc.putNext(value);
+ }
+ }
+
+ static class StoreFuncBasedOutputCommitter extends OutputCommitter {
+ StoreFuncInterface sf;
+ OutputCommitter wrappedOutputCommitter;
+ String location;
+ ResourceSchema rs;
+ public StoreFuncBasedOutputCommitter(StoreFuncInterface sf, OutputCommitter outputCommitter, String location, ResourceSchema rs) {
+ this.sf = sf;
+ this.wrappedOutputCommitter = outputCommitter;
+ this.location = location;
+ this.rs = rs;
+ }
+ @Override
+ public void abortTask(TaskAttemptContext context) throws IOException {
+ wrappedOutputCommitter.abortTask(context);
+ }
+
+ @Override
+ public void commitTask(TaskAttemptContext context) throws IOException {
+ wrappedOutputCommitter.commitTask(context);
+ }
+
+ @Override
+ public boolean needsTaskCommit(TaskAttemptContext context)
+ throws IOException {
+ return wrappedOutputCommitter.needsTaskCommit(context);
+ }
+
+ @Override
+ public void setupJob(JobContext context) throws IOException {
+ wrappedOutputCommitter.setupJob(context);
+ }
+
+ @Override
+ public void setupTask(TaskAttemptContext context) throws IOException {
+ wrappedOutputCommitter.setupTask(context);
+ }
+
+ public void commitJob(JobContext context) throws IOException {
+ wrappedOutputCommitter.commitJob(context);
+ if (sf instanceof StoreMetadata) {
+ if (rs != null) {
+ ((StoreMetadata) sf).storeSchema(
+ rs, location, new Job(context.getConfiguration()) );
+ }
+ }
+ }
+
+ @Override
+ public void cleanupJob(JobContext context) throws IOException {
+ wrappedOutputCommitter.cleanupJob(context);
+ if (sf instanceof StoreMetadata) {
+ if (rs != null) {
+ ((StoreMetadata) sf).storeSchema(
+ rs, location, new Job(context.getConfiguration()) );
+ }
+ }
+ }
+
+ public void abortJob(JobContext context, JobStatus.State state) throws IOException {
+ wrappedOutputCommitter.abortJob(context, state);
+ }
+ }
+}
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/rcfile/RCFileOutputDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/rcfile/RCFileOutputDriver.java?rev=1196278&r1=1196277&r2=1196278&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/rcfile/RCFileOutputDriver.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/rcfile/RCFileOutputDriver.java Tue Nov 1 21:14:50 2011
@@ -95,7 +95,7 @@ import org.apache.hcatalog.mapreduce.Fil
*/
@SuppressWarnings("unchecked")
@Override
- public OutputFormat<? super WritableComparable<?>, ? super Writable> getOutputFormat() throws IOException {
+ public OutputFormat<? extends WritableComparable<?>, ? extends Writable> getOutputFormat() throws IOException {
if( outputFormat == null ) {
outputFormat = new RCFileMapReduceOutputFormat();
}
Modified: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/cli/TestSemanticAnalysis.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/cli/TestSemanticAnalysis.java?rev=1196278&r1=1196277&r2=1196278&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/cli/TestSemanticAnalysis.java (original)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/cli/TestSemanticAnalysis.java Tue Nov 1 21:14:50 2011
@@ -348,9 +348,7 @@ public class TestSemanticAnalysis extend
query = "create table junit_sem_analysis (a int) partitioned by (b string) stored as TEXTFILE";
CommandProcessorResponse response = hcatDriver.run(query);
- assertEquals(10,response.getResponseCode());
- assertEquals("FAILED: Error in semantic analysis: Operation not supported. HCatalog doesn't support Text File by default yet. You may specify it through INPUT/OUTPUT storage drivers.",
- response.getErrorMessage());
+ assertEquals(0,response.getResponseCode());
}
Added: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/MyPigStorage.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/MyPigStorage.java?rev=1196278&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/MyPigStorage.java (added)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/MyPigStorage.java Tue Nov 1 21:14:50 2011
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.pig;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hcatalog.pig.drivers.PigStorageInputDriver;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.data.Tuple;
+
+public class MyPigStorage extends PigStorage {
+
+ String arg2;
+ public MyPigStorage(String arg1, String arg2) throws IOException {
+ super(arg1);
+ this.arg2 = arg2;
+ }
+
+ @Override
+ public void putNext(Tuple t) throws IOException {
+ t.append(arg2);
+ super.putNext(t);
+ }
+}
Modified: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/MyPigStorageDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/MyPigStorageDriver.java?rev=1196278&r1=1196277&r2=1196278&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/MyPigStorageDriver.java (original)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/MyPigStorageDriver.java Tue Nov 1 21:14:50 2011
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hcatalog.pig;
-
-import java.io.IOException;
-import java.util.Properties;
-
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hcatalog.pig.drivers.PigStorageInputDriver;
-
-public class MyPigStorageDriver extends PigStorageInputDriver{
-
- @Override
- public void initialize(JobContext context, Properties storageDriverArgs) throws IOException {
- if ( !"control-A".equals(storageDriverArgs.getProperty(PigStorageInputDriver.delim))){
- /* This is the only way to make testcase fail. Throwing exception from
- * here doesn't propagate up.
- */
- System.exit(1);
- }
- super.initialize(context, storageDriverArgs);
- }
-}
Modified: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestPigStorageDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestPigStorageDriver.java?rev=1196278&r1=1196277&r2=1196278&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestPigStorageDriver.java (original)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestPigStorageDriver.java Tue Nov 1 21:14:50 2011
@@ -21,7 +21,9 @@ import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
+import java.io.FileWriter;
import java.io.IOException;
+import java.io.PrintWriter;
import java.util.Iterator;
import java.util.Map;
@@ -45,8 +47,8 @@ import org.apache.hadoop.hive.ql.io.RCFi
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer;
+import org.apache.hcatalog.common.HCatConstants;
import org.apache.hcatalog.pig.HCatLoader;
-import org.apache.hcatalog.pig.drivers.PigStorageInputDriver;
import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.data.Tuple;
@@ -59,6 +61,9 @@ public class TestPigStorageDriver extend
private HiveConf hcatConf;
private Driver hcatDriver;
private HiveMetaStoreClient msc;
+ private static String tblLocation = "/tmp/test_pig/data";
+ private static String anyExistingFileInCurDir = "ivy.xml";
+ private static String warehouseDir = "/tmp/hcat_junit_warehouse";
@Override
protected void setUp() throws Exception {
@@ -81,57 +86,91 @@ public class TestPigStorageDriver extend
public void testPigStorageDriver() throws IOException, CommandNeedRetryException{
-
String fsLoc = hcatConf.get("fs.default.name");
- Path tblPath = new Path(fsLoc, "/tmp/test_pig/data");
- String anyExistingFileInCurDir = "ivy.xml";
+ Path tblPath = new Path(fsLoc, tblLocation);
+ String tblName = "junit_pigstorage";
tblPath.getFileSystem(hcatConf).copyFromLocalFile(new Path(anyExistingFileInCurDir),tblPath);
- hcatDriver.run("drop table junit_pigstorage");
+ hcatDriver.run("drop table " + tblName);
CommandProcessorResponse resp;
- String createTable = "create table junit_pigstorage (a string) partitioned by (b string) stored as RCFILE";
+ String createTable = "create table " + tblName + " (a string) partitioned by (b string) stored as TEXTFILE";
resp = hcatDriver.run(createTable);
assertEquals(0, resp.getResponseCode());
assertNull(resp.getErrorMessage());
- resp = hcatDriver.run("alter table junit_pigstorage add partition (b='2010-10-10') location '"+new Path(fsLoc, "/tmp/test_pig")+"'");
+ resp = hcatDriver.run("alter table " + tblName + " add partition (b='2010-10-10') location '"+new Path(fsLoc, "/tmp/test_pig")+"'");
assertEquals(0, resp.getResponseCode());
assertNull(resp.getErrorMessage());
- resp = hcatDriver.run("alter table junit_pigstorage partition (b='2010-10-10') set fileformat inputformat '" + RCFileInputFormat.class.getName()
- +"' outputformat '"+RCFileOutputFormat.class.getName()+"' inputdriver '"+PigStorageInputDriver.class.getName()+"' outputdriver 'non-existent'");
+ resp = hcatDriver.run("alter table " + tblName + " partition (b='2010-10-10') set fileformat TEXTFILE");
assertEquals(0, resp.getResponseCode());
assertNull(resp.getErrorMessage());
- resp = hcatDriver.run("desc extended junit_pigstorage partition (b='2010-10-10')");
+ resp = hcatDriver.run("desc extended " + tblName + " partition (b='2010-10-10')");
assertEquals(0, resp.getResponseCode());
assertNull(resp.getErrorMessage());
PigServer server = new PigServer(ExecType.LOCAL, hcatConf.getAllProperties());
UDFContext.getUDFContext().setClientSystemProps();
- server.registerQuery(" a = load 'junit_pigstorage' using "+HCatLoader.class.getName()+";");
+ server.registerQuery(" a = load '" + tblName + "' using "+HCatLoader.class.getName()+";");
Iterator<Tuple> itr = server.openIterator("a");
- DataInputStream stream = new DataInputStream(new BufferedInputStream(new FileInputStream(new File(anyExistingFileInCurDir))));
- while(itr.hasNext()){
- Tuple t = itr.next();
- assertEquals(2, t.size());
- if(t.get(0) != null) {
- // If underlying data-field is empty. PigStorage inserts null instead
- // of empty String objects.
- assertTrue(t.get(0) instanceof String);
- assertEquals(stream.readLine(), t.get(0));
- }
- else{
- assertTrue(stream.readLine().isEmpty());
+ boolean result = compareWithFile(itr, anyExistingFileInCurDir, 2, "2010-10-10", null);
+ assertTrue(result);
+
+ server.registerQuery("a = load '"+tblPath.toString()+"' using PigStorage() as (a:chararray);");
+ server.store("a", tblName, HCatStorer.class.getName() + "('b=2010-10-11')");
+
+ server.registerQuery("a = load '" + warehouseDir + "/" + tblName + "/b=2010-10-11' using PigStorage() as (a:chararray);");
+ itr = server.openIterator("a");
+ result = compareWithFile(itr, anyExistingFileInCurDir, 1, "2010-10-11", null);
+ assertTrue(result);
+
+ // Test multi-store
+ server.registerQuery("a = load '"+tblPath.toString()+"' using PigStorage() as (a:chararray);");
+ server.registerQuery("store a into '" + tblName + "' using " + HCatStorer.class.getName() + "('b=2010-11-01');");
+ server.registerQuery("store a into '" + tblName + "' using " + HCatStorer.class.getName() + "('b=2010-11-02');");
+
+ server.registerQuery("a = load '" + warehouseDir + "/" + tblName + "/b=2010-11-01' using PigStorage() as (a:chararray);");
+ itr = server.openIterator("a");
+ result = compareWithFile(itr, anyExistingFileInCurDir, 1, "2010-11-01", null);
+ assertTrue(result);
+
+ server.registerQuery("a = load '" + warehouseDir + "/" + tblName + "/b=2010-11-02' using PigStorage() as (a:chararray);");
+ itr = server.openIterator("a");
+ result = compareWithFile(itr, anyExistingFileInCurDir, 1, "2010-11-02", null);
+ assertTrue(result);
+
+ hcatDriver.run("drop table " + tblName);
+ }
+
+ private boolean compareWithFile(Iterator<Tuple> itr, String factFile, int numColumn, String key, String valueSuffix) throws IOException {
+ DataInputStream stream = new DataInputStream(new BufferedInputStream(new FileInputStream(new File(factFile))));
+ while(itr.hasNext()){
+ Tuple t = itr.next();
+ assertEquals(numColumn, t.size());
+ if(t.get(0) != null) {
+ // If underlying data-field is empty. PigStorage inserts null instead
+ // of empty String objects.
+ assertTrue(t.get(0) instanceof String);
+ String expected = stream.readLine();
+ if (valueSuffix!=null)
+ expected += valueSuffix;
+ assertEquals(expected, t.get(0));
+ }
+ else{
+ assertTrue(stream.readLine().isEmpty());
+ }
+
+ if (numColumn>1) {
+ // The second column must be key
+ assertTrue(t.get(1) instanceof String);
+ assertEquals(key, t.get(1));
+ }
}
- assertTrue(t.get(1) instanceof String);
-
- assertEquals("2010-10-10", t.get(1));
- }
- assertEquals(0,stream.available());
- stream.close();
- hcatDriver.run("drop table junit_pigstorage");
+ assertEquals(0,stream.available());
+ stream.close();
+ return true;
}
public void testDelim() throws MetaException, TException, UnknownTableException, NoSuchObjectException, InvalidOperationException, IOException, CommandNeedRetryException{
@@ -139,7 +178,7 @@ public class TestPigStorageDriver extend
hcatDriver.run("drop table junit_pigstorage_delim");
CommandProcessorResponse resp;
- String createTable = "create table junit_pigstorage_delim (a string) partitioned by (b string) stored as RCFILE";
+ String createTable = "create table junit_pigstorage_delim (a0 string, a1 string) partitioned by (b string) stored as RCFILE";
resp = hcatDriver.run(createTable);
@@ -150,12 +189,12 @@ public class TestPigStorageDriver extend
assertEquals(0, resp.getResponseCode());
assertNull(resp.getErrorMessage());
- resp = hcatDriver.run("alter table junit_pigstorage_delim partition (b='2010-10-10') set fileformat inputformat '" + RCFileInputFormat.class.getName()
- +"' outputformat '"+RCFileOutputFormat.class.getName()+"' inputdriver '"+MyPigStorageDriver.class.getName()+"' outputdriver 'non-existent'");
+ resp = hcatDriver.run("alter table junit_pigstorage_delim partition (b='2010-10-10') set fileformat TEXTFILE");
Partition part = msc.getPartition(MetaStoreUtils.DEFAULT_DATABASE_NAME, "junit_pigstorage_delim", "b=2010-10-10");
Map<String,String> partParms = part.getParameters();
- partParms.put(PigStorageInputDriver.delim, "control-A");
+ partParms.put(HCatConstants.HCAT_PIG_LOADER_ARGS, "control-A");
+ partParms.put(HCatConstants.HCAT_PIG_STORER_ARGS, "control-A");
msc.alter_partition(MetaStoreUtils.DEFAULT_DATABASE_NAME, "junit_pigstorage_delim", part);
@@ -165,5 +204,67 @@ public class TestPigStorageDriver extend
try{
server.openIterator("a");
}catch(FrontendException fe){}
+
+ resp = hcatDriver.run("alter table junit_pigstorage_delim set fileformat TEXTFILE");
+ assertEquals(0, resp.getResponseCode());
+ assertNull(resp.getErrorMessage());
+ resp = hcatDriver.run("alter table junit_pigstorage_delim set TBLPROPERTIES ('hcat.pig.loader.args'=':', 'hcat.pig.storer.args'=':')");
+ assertEquals(0, resp.getResponseCode());
+ assertNull(resp.getErrorMessage());
+
+ File inputFile = File.createTempFile("hcat_test", "");
+ PrintWriter p = new PrintWriter(new FileWriter(inputFile));
+ p.println("1\t2");
+ p.println("3\t4");
+ p.close();
+ server.registerQuery("a = load '"+inputFile.toString()+"' using PigStorage() as (a0:chararray, a1:chararray);");
+ server.store("a", "junit_pigstorage_delim", HCatStorer.class.getName() + "('b=2010-10-11')");
+
+ server.registerQuery("a = load '/tmp/hcat_junit_warehouse/junit_pigstorage_delim/b=2010-10-11' using PigStorage() as (a:chararray);");
+ Iterator<Tuple> itr = server.openIterator("a");
+
+ assertTrue(itr.hasNext());
+ Tuple t = itr.next();
+ assertTrue(t.get(0).equals("1:2"));
+
+ assertTrue(itr.hasNext());
+ t = itr.next();
+ assertTrue(t.get(0).equals("3:4"));
+
+ assertFalse(itr.hasNext());
+ inputFile.delete();
}
+
+ public void testMultiConstructArgs() throws MetaException, TException, UnknownTableException, NoSuchObjectException, InvalidOperationException, IOException, CommandNeedRetryException{
+
+ String fsLoc = hcatConf.get("fs.default.name");
+ Path tblPath = new Path(fsLoc, tblLocation);
+ String tblName = "junit_pigstorage_constructs";
+ tblPath.getFileSystem(hcatConf).copyFromLocalFile(new Path(anyExistingFileInCurDir),tblPath);
+
+ hcatDriver.run("drop table junit_pigstorage_constructs");
+
+ CommandProcessorResponse resp;
+ String createTable = "create table " + tblName + " (a string) partitioned by (b string) stored as TEXTFILE";
+
+ resp = hcatDriver.run(createTable);
+
+ assertEquals(0, resp.getResponseCode());
+ assertNull(resp.getErrorMessage());
+
+ resp = hcatDriver.run("alter table " + tblName + " set TBLPROPERTIES ('hcat.pig.storer'='org.apache.hcatalog.pig.MyPigStorage', 'hcat.pig.storer.args'=':#hello', 'hcat.pig.args.delimiter'='#')");
+ assertEquals(0, resp.getResponseCode());
+ assertNull(resp.getErrorMessage());
+
+ PigServer server = new PigServer(ExecType.LOCAL, hcatConf.getAllProperties());
+ UDFContext.getUDFContext().setClientSystemProps();
+
+ server.registerQuery("a = load '"+tblPath.toString()+"' using PigStorage() as (a:chararray);");
+ server.store("a", tblName, HCatStorer.class.getName() + "('b=2010-10-11')");
+
+ server.registerQuery("a = load '" + warehouseDir + "/" + tblName + "/b=2010-10-11' using PigStorage() as (a:chararray);");
+ Iterator<Tuple> itr = server.openIterator("a");
+ boolean result = compareWithFile(itr, anyExistingFileInCurDir, 1, "2010-10-11", ":hello");
+ assertTrue(result);
+ }
}
Modified: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputStorageDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputStorageDriver.java?rev=1196278&r1=1196277&r2=1196278&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputStorageDriver.java (original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputStorageDriver.java Tue Nov 1 21:14:50 2011
@@ -48,7 +48,7 @@ public class HBaseBulkOutputStorageDrive
}
@Override
- public OutputFormat<? super WritableComparable<?>, ? super Writable> getOutputFormat() throws IOException {
+ public OutputFormat<? extends WritableComparable<?>, ? extends Writable> getOutputFormat() throws IOException {
return outputFormat;
}
Modified: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputStorageDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputStorageDriver.java?rev=1196278&r1=1196277&r2=1196278&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputStorageDriver.java (original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputStorageDriver.java Tue Nov 1 21:14:50 2011
@@ -41,7 +41,7 @@ public class HBaseDirectOutputStorageDri
}
@Override
- public OutputFormat<? super WritableComparable<?>, ? super Writable> getOutputFormat() throws IOException {
+ public OutputFormat<? extends WritableComparable<?>, ? extends Writable> getOutputFormat() throws IOException {
return outputFormat;
}
Modified: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseOutputStorageDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseOutputStorageDriver.java?rev=1196278&r1=1196277&r2=1196278&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseOutputStorageDriver.java (original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseOutputStorageDriver.java Tue Nov 1 21:14:50 2011
@@ -60,7 +60,7 @@ public class HBaseOutputStorageDriver ex
}
@Override
- public OutputFormat<? super WritableComparable<?>, ? super Writable> getOutputFormat() throws IOException {
+ public OutputFormat<? extends WritableComparable<?>, ? extends Writable> getOutputFormat() throws IOException {
return activeOSD.getOutputFormat();
}