You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2010/11/16 19:23:23 UTC
svn commit: r1035726 - in /pig/trunk/contrib: ./
piggybank/java/src/main/java/org/apache/pig/piggybank/storage/
piggybank/java/src/main/java/org/apache/pig/piggybank/storage/allloader/
piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/
Author: gates
Date: Tue Nov 16 18:23:23 2010
New Revision: 1035726
URL: http://svn.apache.org/viewvc?rev=1035726&view=rev
Log:
PIG-1722 PiggyBank AllLoader - Load multiple file formats in one load statement.
Added:
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/AllLoader.java
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/allloader/
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/allloader/LoadFuncHelper.java
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAllLoader.java
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestLoadFuncHelper.java
Modified:
pig/trunk/contrib/CHANGES.txt
Modified: pig/trunk/contrib/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/CHANGES.txt?rev=1035726&r1=1035725&r2=1035726&view=diff
==============================================================================
--- pig/trunk/contrib/CHANGES.txt (original)
+++ pig/trunk/contrib/CHANGES.txt Tue Nov 16 18:23:23 2010
@@ -24,6 +24,9 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-1722 PiggyBank AllLoader - Load multiple file formats in one load
+statement (gerritjw via gates)
+
PIG-1502 Remove Owl as a contrib project (gates)
PIG-1386 UDF to extend functionalities of MaxTupleBy1stField (hcbusy via olgan)
Added: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/AllLoader.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/AllLoader.java?rev=1035726&view=auto
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/AllLoader.java (added)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/AllLoader.java Tue Nov 16 18:23:23 2010
@@ -0,0 +1,730 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.pig.piggybank.storage;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.log4j.Logger;
+import org.apache.pig.Expression;
+import org.apache.pig.FileInputLoadFunc;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.LoadCaster;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.LoadMetadata;
+import org.apache.pig.LoadPushDown;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.ResourceStatistics;
+import org.apache.pig.StoreMetadata;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.builtin.Utf8StorageConverter;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.piggybank.storage.allloader.LoadFuncHelper;
+import org.apache.pig.piggybank.storage.partition.PathPartitionHelper;
+
+/**
+ * The AllLoader provides the ability to point pig at a folder that contains
+ * files in multiple formats e.g. PlainText, Gz, Bz, Lzo, HiveRC etc and have
+ * the LoadFunc(s) automatically selected based on the file extension. <br/>
+ * <b>How this works:<b/><br/>
+ * The file extensions are mapped in the pig.properties via the property
+ * file.extension.loaders.
+ *
+ * <p/>
+ * <b>file.extension.loaders format</b>
+ * <ul>
+ * <li>[file extension]:[loader func spec]</li>
+ * <li>[file-extension]:[optional path tag]:[loader func spec]</li>
+ * <li>[file-extension]:[optional path tag]:[sequence file key value writer
+ * class name]:[loader func spec]</li>
+ * </ul>
+ *
+ * <p/>
+ * The file.extension.loaders property associate pig loaders with file
+ * extensions, if a file does not have an extension the AllLoader will look at
+ * the first three bytes of a file and try to guess its format bassed on:
+ * <ul>
+ * <li>[ -119, 76, 90 ] = lzo</li>
+ * <li>[ 31, -117, 8 ] = gz</li>
+ * <li>[ 66, 90, 104 ] = bz2</li>
+ * <li>[ 83, 69, 81 ] = seq</li>
+ * </ul>
+ * <br/>
+ * The loader associated with that extension will then be used.
+ * <p/>
+ *
+ * <b>Path partitioning</b> The AllLoader supports hive style path partitioning
+ * e.g. /log/type1/daydate=2010-11-01<br/>
+ * "daydate" will be considered a partition key and filters can be written
+ * against this.<br/>
+ * Note that the filter should go into the AllLoader contructor e.g.<br/>
+ * a = LOAD 'input' using AllLoader('daydate<\"2010-11-01\"')<br/>
+ *
+ * <b>Path tags</b> AllLoader supports configuring different loaders for the
+ * same extension based on there file path.<br/>
+ * E.g.<br/>
+ * We have the paths /log/type1, /log/type2<br/>
+ * For each of these directories we'd like to use different loaders.<br/>
+ * So we use setup our loaders:<br/>
+ * file.extension.loaders:gz:type1:MyType1Loader, gz:type2:MyType2Loader<br/>
+ *
+ *
+ * <p/>
+ * <b>Sequence files<b/> Sequence files also support using the Path tags for
+ * loader selection but has an extra configuration option that relates to the
+ * Key Class used to write the Sequence file.<br/>
+ * E.g. for HiveRC this value is: org.apache.hadoop.hive.ql.io.RCFile so we can
+ * setup our sequence file formatting:<br/>
+ * file.extension.loaders:seq::org.apache.hadoop.hive.ql.io.RCFile:
+ * MyHiveRCLoader, seq::DefaultSequenceFileLoader<br/>
+ *
+ * <p/>
+ * <b>Schema</b> The JsoneMetadata schema loader is supported and the schema
+ * will be loaded using this loader.<br/>
+ * In case this fails, the schema can be loaded using the default schema
+ * provided.
+ *
+ */
+public class AllLoader extends FileInputLoadFunc implements LoadMetadata,
+ StoreMetadata, LoadPushDown {
+
+ private static final Logger LOG = Logger.getLogger(AllLoader.class);
+
+ private static final String PROJECTION_ID = AllLoader.class.getName()
+ + ".projection";
+
+ transient LoadFunc childLoadFunc;
+ transient boolean supportPushDownProjection = false;
+ transient RequiredFieldList requiredFieldList;
+ transient SortedSet<Integer> requiredFieldHashSet;
+
+ transient TupleFactory tupleFactory = TupleFactory.getInstance();
+ transient ResourceSchema schema;
+
+ String signature;
+
+ /**
+ * Implements the logic for searching partition keys and applying parition
+ * filtering
+ */
+ transient PathPartitionHelper pathPartitionerHelper = new PathPartitionHelper();
+ transient Map<String, String> currentPathPartitionKeyMap;
+ transient String[] partitionColumns;
+
+ transient JsonMetadata jsonMetadata;
+ transient boolean partitionKeysSet = false;
+
+ LoadFuncHelper loadFuncHelper = null;
+
+ transient Configuration conf;
+ transient Path currentPath;
+
+ String constructorPassedPartitionFilter;
+
+ public AllLoader() {
+ jsonMetadata = new JsonMetadata();
+ }
+
+ public AllLoader(String partitionFilter) {
+ this();
+ LOG.debug("PartitionFilter: " + partitionFilter.toString());
+
+ constructorPassedPartitionFilter = partitionFilter;
+
+ }
+
+ @Override
+ public void setLocation(String location, Job job) throws IOException {
+ FileInputFormat.setInputPaths(job, location);
+ // called on the front end
+ conf = job.getConfiguration();
+ loadFuncHelper = new LoadFuncHelper(conf);
+
+ if (constructorPassedPartitionFilter != null) {
+
+ pathPartitionerHelper.setPartitionFilterExpression(
+ constructorPassedPartitionFilter, AllLoader.class,
+ signature);
+
+ }
+
+ getPartitionKeys(location, job);
+ }
+
+ @Override
+ public LoadCaster getLoadCaster() throws IOException {
+ return new Utf8StorageConverter();
+ }
+
+ @Override
+ public AllLoaderInputFormat getInputFormat() throws IOException {
+ // this plugs the AllLoaderInputFormat into the system, which in turn
+ // will plug in the AllRecordReader
+ // the AllRecordReader will select and create the correct LoadFunc
+ return new AllLoaderInputFormat(signature);
+ }
+
+ @Override
+ public void prepareToRead(
+ @SuppressWarnings("rawtypes") RecordReader reader, PigSplit split)
+ throws IOException {
+
+ AllReader allReader = (AllReader) reader;
+
+ if (currentPath == null || !(currentPath.equals(allReader.path))) {
+ currentPathPartitionKeyMap = (partitionColumns == null) ? null
+ : pathPartitionerHelper
+ .getPathPartitionKeyValues(allReader.path
+ .toString());
+ currentPath = allReader.path;
+ }
+
+ childLoadFunc = allReader.prepareLoadFuncForReading(split);
+
+ String projectProperty = getUDFContext().getProperty(PROJECTION_ID);
+
+ if (projectProperty != null) {
+
+ // load the required field list from the current UDF context
+ ByteArrayInputStream input = new ByteArrayInputStream(
+ Base64.decodeBase64(projectProperty.getBytes("UTF-8")));
+
+ ObjectInputStream objInput = new ObjectInputStream(input);
+
+ try {
+ requiredFieldList = (RequiredFieldList) objInput.readObject();
+ } catch (ClassNotFoundException e) {
+ throw new FrontendException(e.toString(), e);
+ } finally {
+ IOUtils.closeStream(objInput);
+ }
+
+ if (childLoadFunc.getClass().isAssignableFrom(LoadPushDown.class)) {
+ supportPushDownProjection = true;
+ ((LoadPushDown) childLoadFunc)
+ .pushProjection(requiredFieldList);
+ } else {
+ if (requiredFieldList != null) {
+ requiredFieldHashSet = new TreeSet<Integer>();
+ for (RequiredField requiredField : requiredFieldList
+ .getFields()) {
+ requiredFieldHashSet.add(requiredField.getIndex());
+ }
+ }
+ }
+
+ }
+
+ }
+
+ @Override
+ public Tuple getNext() throws IOException {
+ // delegate work to the child load func selected based on the file type
+ // and other criteria
+ // We do support PushDown Projection if the LoadFunc does not so
+ // in this method we need to look at the childLoadFunc flag
+ // (supportPushDownProjection )
+ // if true we use the getNext method as is, if not we remove the fields
+ // not required in the spushDownProjection.
+
+ Tuple tuple = null;
+
+ if (supportPushDownProjection) {
+ tuple = childLoadFunc.getNext();
+ } else if ((tuple = childLoadFunc.getNext()) != null) {
+ // ----- If the function does not support projection we do it here
+
+ if (requiredFieldHashSet != null) {
+
+ Tuple projectedTuple = tupleFactory
+ .newTuple(requiredFieldHashSet.size());
+ int i = 0;
+ int tupleSize = tuple.size();
+
+ for (int index : requiredFieldHashSet) {
+ if (index < tupleSize) {
+ // add the tuple columns
+ projectedTuple.set(i++, tuple.get(index));
+ } else {
+ // add the partition columns
+ projectedTuple.set(i++, currentPathPartitionKeyMap
+ .get(partitionColumns[index - tupleSize]));
+ }
+ }
+
+ tuple = projectedTuple;
+ }
+
+ }
+
+ return tuple;
+ }
+
+ @Override
+ public List<OperatorSet> getFeatures() {
+ return Arrays.asList(LoadPushDown.OperatorSet.PROJECTION);
+ }
+
+ @Override
+ public RequiredFieldResponse pushProjection(
+ RequiredFieldList requiredFieldList) throws FrontendException {
+ // save the required field list to the UDFContext properties.
+
+ Properties properties = getUDFContext();
+
+ ByteArrayOutputStream byteArray = new ByteArrayOutputStream();
+ ObjectOutputStream objOut = null;
+ try {
+ objOut = new ObjectOutputStream(byteArray);
+ objOut.writeObject(requiredFieldList);
+ } catch (IOException e) {
+ throw new FrontendException(e.toString(), e);
+ } finally {
+ IOUtils.closeStream(objOut);
+ }
+
+ // write out the whole required fields list as a base64 string
+ try {
+ properties.setProperty(PROJECTION_ID,
+ new String(Base64.encodeBase64(byteArray.toByteArray()),
+ "UTF-8"));
+ } catch (UnsupportedEncodingException e) {
+ throw new FrontendException(e.toString(), e);
+ }
+
+ return new RequiredFieldResponse(true);
+ }
+
+ /**
+ * Tries to determine the LoadFunc by using the LoadFuncHelper to identify a
+ * loader for the first file in the location directory.<br/>
+ * If no LoadFunc can be determine ad FrontendException is thrown.<br/>
+ * If the LoadFunc implements the LoadMetadata interface and returns a non
+ * null schema this schema is returned.
+ *
+ * @param location
+ * @param job
+ * @return
+ * @throws IOException
+ */
+ private ResourceSchema getSchemaFromLoadFunc(String location, Job job)
+ throws IOException {
+
+ ResourceSchema schema = null;
+
+ if (loadFuncHelper == null) {
+ loadFuncHelper = new LoadFuncHelper(job.getConfiguration());
+ }
+
+ Path firstFile = loadFuncHelper.determineFirstFile(location);
+
+ if (childLoadFunc == null) {
+
+ // choose loader
+ FuncSpec funcSpec = loadFuncHelper.determineFunction(location,
+ firstFile);
+
+ if (funcSpec == null) {
+ // throw front end exception, no loader could be determined.
+ throw new FrontendException(
+ "No LoadFunction could be determined for " + location);
+ }
+
+ childLoadFunc = (LoadFunc) PigContext
+ .instantiateFuncFromSpec(funcSpec);
+ }
+
+ LOG.debug("Found LoadFunc: " + childLoadFunc.getClass().getName());
+
+ if (childLoadFunc instanceof LoadMetadata) {
+ schema = ((LoadMetadata) childLoadFunc).getSchema(firstFile.toUri()
+ .toString(), job);
+ LOG.debug("Found schema " + schema + " from loadFunc: "
+ + childLoadFunc.getClass().getName());
+ }
+
+ return schema;
+ }
+
+ @Override
+ public ResourceSchema getSchema(String location, Job job)
+ throws IOException {
+
+ if (schema == null) {
+ ResourceSchema foundSchema = jsonMetadata.getSchema(location, job);
+
+ // determine schema from files in location
+ if (foundSchema == null) {
+ foundSchema = getSchemaFromLoadFunc(location, job);
+
+ }
+
+ // only add the partition keys if the schema is not null
+ // we use the partitionKeySet to only set partition keys once.
+ if (!(partitionKeysSet || foundSchema == null)) {
+ String[] keys = getPartitionColumns(location, job);
+
+ if (!(keys == null || keys.length == 0)) {
+
+ // re-edit the pigSchema to contain the new partition keys.
+ ResourceFieldSchema[] fields = foundSchema.getFields();
+
+ LOG.debug("Schema: " + Arrays.toString(fields));
+
+ ResourceFieldSchema[] newFields = Arrays.copyOf(fields,
+ fields.length + keys.length);
+
+ int index = fields.length;
+
+ for (String key : keys) {
+ newFields[index++] = new ResourceFieldSchema(
+ new FieldSchema(key, DataType.CHARARRAY));
+ }
+
+ foundSchema.setFields(newFields);
+
+ LOG.debug("Added partition fields: " + keys
+ + " to loader schema");
+ LOG.debug("Schema is: " + Arrays.toString(newFields));
+ }
+
+ partitionKeysSet = true;
+
+ }
+
+ schema = foundSchema;
+ }
+
+ return schema;
+ }
+
+ @Override
+ public ResourceStatistics getStatistics(String location, Job job)
+ throws IOException {
+ return null;
+ }
+
+ @Override
+ public void storeStatistics(ResourceStatistics stats, String location,
+ Job job) throws IOException {
+
+ }
+
+ @Override
+ public void storeSchema(ResourceSchema schema, String location, Job job)
+ throws IOException {
+ jsonMetadata.storeSchema(schema, location, job);
+ }
+
+ /**
+ * Reads the partition columns
+ *
+ * @param location
+ * @param job
+ * @return
+ */
+ private String[] getPartitionColumns(String location, Job job) {
+
+ if (partitionColumns == null) {
+ // read the partition columns from the UDF Context first.
+ // if not in the UDF context then read it using the PathPartitioner.
+
+ Properties properties = getUDFContext();
+
+ if (properties == null) {
+ properties = new Properties();
+ }
+
+ String partitionColumnStr = properties
+ .getProperty(PathPartitionHelper.PARTITION_COLUMNS);
+
+ if (partitionColumnStr == null
+ && !(location == null || job == null)) {
+ // if it hasn't been written yet.
+ Set<String> partitionColumnSet;
+
+ try {
+ partitionColumnSet = pathPartitionerHelper
+ .getPartitionKeys(location, job.getConfiguration());
+ } catch (IOException e) {
+
+ RuntimeException rte = new RuntimeException(e);
+ rte.setStackTrace(e.getStackTrace());
+ throw rte;
+
+ }
+
+ if (partitionColumnSet != null) {
+
+ StringBuilder buff = new StringBuilder();
+
+ int i = 0;
+ for (String column : partitionColumnSet) {
+ if (i++ != 0) {
+ buff.append(',');
+ }
+
+ buff.append(column);
+ }
+
+ String buffStr = buff.toString().trim();
+
+ if (buffStr.length() > 0) {
+
+ properties.setProperty(
+ PathPartitionHelper.PARTITION_COLUMNS,
+ buff.toString());
+ }
+
+ partitionColumns = partitionColumnSet
+ .toArray(new String[] {});
+
+ }
+
+ } else {
+ // the partition columns has been set already in the UDF Context
+ if (partitionColumnStr != null) {
+ String split[] = partitionColumnStr.split(",");
+ Set<String> partitionColumnSet = new LinkedHashSet<String>();
+ if (split.length > 0) {
+ for (String splitItem : split) {
+ partitionColumnSet.add(splitItem);
+ }
+ }
+
+ partitionColumns = partitionColumnSet
+ .toArray(new String[] {});
+ }
+
+ }
+
+ }
+
+ return partitionColumns;
+
+ }
+
+ @Override
+ public String[] getPartitionKeys(String location, Job job)
+ throws IOException {
+
+ String[] partitionKeys = getPartitionColumns(location, job);
+
+ if (partitionKeys == null) {
+ throw new NullPointerException("INDUCED");
+ }
+ LOG.info("Get Parition Keys for: " + location + " keys: "
+ + Arrays.toString(partitionKeys));
+
+ return partitionKeys;
+ }
+
+ // --------------- Save Signature and PartitionFilter Expression
+ // ----------------- //
+ @Override
+ public void setUDFContextSignature(String signature) {
+ this.signature = signature;
+ super.setUDFContextSignature(signature);
+ }
+
+ private Properties getUDFContext() {
+ return UDFContext.getUDFContext().getUDFProperties(this.getClass(),
+ new String[] { signature });
+ }
+
+ @Override
+ public void setPartitionFilter(Expression partitionFilter)
+ throws IOException {
+ LOG.debug("PartitionFilter: " + partitionFilter.toString());
+
+ pathPartitionerHelper.setPartitionFilterExpression(
+ partitionFilter.toString(), AllLoader.class, signature);
+
+ }
+
+ /**
+ * InputFormat that encapsulates the correct input format based on the file
+ * type.
+ *
+ */
+ public static class AllLoaderInputFormat extends
+ FileInputFormat<Writable, Writable> {
+
+ transient PathPartitionHelper partitionHelper = new PathPartitionHelper();
+ String udfSignature;
+
+ public AllLoaderInputFormat(String udfSignature) {
+ super();
+ this.udfSignature = udfSignature;
+ }
+
+ @Override
+ protected List<FileStatus> listStatus(JobContext jobContext)
+ throws IOException {
+
+ List<FileStatus> files = partitionHelper.listStatus(jobContext,
+ AllLoader.class, udfSignature);
+
+ if (files == null)
+ files = super.listStatus(jobContext);
+
+ return files;
+
+ }
+
+ @Override
+ public RecordReader<Writable, Writable> createRecordReader(
+ InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
+ throws IOException, InterruptedException {
+
+ // this method plugs the AllReader into the system, and the
+ // AllReader will when called select the correct LoadFunc
+ // return new AllReader(udfSignature);
+ return new AllReader(udfSignature);
+ }
+
+ }
+
+ /**
+ * This is where the logic is for selecting the correct Loader.
+ *
+ */
+ public static class AllReader extends RecordReader<Writable, Writable> {
+
+ LoadFunc selectedLoadFunc;
+ RecordReader<Writable, Writable> selectedReader;
+ LoadFuncHelper loadFuncHelper = null;
+ String udfSignature;
+ Path path;
+
+ public AllReader(String udfSignature) {
+ this.udfSignature = udfSignature;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void initialize(InputSplit inputSplit,
+ TaskAttemptContext taskAttemptContext) throws IOException,
+ InterruptedException {
+
+ FileSplit fileSplit = (FileSplit) inputSplit;
+
+ path = fileSplit.getPath();
+ String fileName = path.toUri().toString();
+
+ // select the correct load function and initialise
+ loadFuncHelper = new LoadFuncHelper(
+ taskAttemptContext.getConfiguration());
+
+ FuncSpec funcSpec = loadFuncHelper.determineFunction(fileName);
+
+ if (funcSpec == null) {
+ throw new IOException("Cannot determine LoadFunc for "
+ + fileName);
+ }
+
+ selectedLoadFunc = (LoadFunc) PigContext
+ .instantiateFuncFromSpec(funcSpec);
+
+ selectedLoadFunc.setUDFContextSignature(udfSignature);
+ selectedLoadFunc.setLocation(fileName,
+ new Job(taskAttemptContext.getConfiguration(),
+ taskAttemptContext.getJobName()));
+
+ selectedReader = selectedLoadFunc.getInputFormat()
+ .createRecordReader(fileSplit, taskAttemptContext);
+
+ selectedReader.initialize(fileSplit, taskAttemptContext);
+
+ LOG.info("Using LoadFunc " + selectedLoadFunc.getClass().getName()
+ + " on " + fileName);
+
+ }
+
+ // ---------------------- all functions below this line delegate work to
+ // the selectedReader ------------//
+
+ public LoadFunc prepareLoadFuncForReading(PigSplit split)
+ throws IOException {
+
+ selectedLoadFunc.prepareToRead(selectedReader, split);
+ return selectedLoadFunc;
+
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ return selectedReader.nextKeyValue();
+ }
+
+ @Override
+ public Writable getCurrentKey() throws IOException,
+ InterruptedException {
+ return selectedReader.getCurrentKey();
+ }
+
+ @Override
+ public Writable getCurrentValue() throws IOException,
+ InterruptedException {
+ return selectedReader.getCurrentValue();
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ return selectedReader.getProgress();
+ }
+
+ @Override
+ public void close() throws IOException {
+ selectedReader.close();
+ }
+
+ }
+
+}
Added: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/allloader/LoadFuncHelper.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/allloader/LoadFuncHelper.java?rev=1035726&view=auto
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/allloader/LoadFuncHelper.java (added)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/allloader/LoadFuncHelper.java Tue Nov 16 18:23:23 2010
@@ -0,0 +1,474 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.pig.piggybank.storage.allloader;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+
+/**
+ *
+ * Contains the logic for finding a LoadFunc based on the definition of:
+ * <ul>
+ * <li>file.extension.loaders</li>
+ * <li>file.format.loaders</li>
+ * </ul>
+ *
+ */
+public class LoadFuncHelper {
+
+ public static final String FILE_EXTENSION_LOADERS = "file.extension.loaders";
+
+ /**
+ * The most common type of file formats are supported i.e. SEQ, GZ, BZ2, LZO
+ * This is used when a file does not have an extension. If this is true the
+ * first 3 bytes can be read from a file to determine its content type. The
+ * 3 bytes are then mapped to an extension for which an entry must exist in
+ * file.extension.loaders. If the content does not match the any entry in
+ * magicNumberExtensionMap plain text is assumed.
+ */
+ private static Map<MagicNumber, String> magicNumberExtensionMap = buildMagicNumberExtensionMap();
+
+ Configuration conf;
+
+ FileSystem fileSystem;
+
+ /**
+ * Stores the extension:tag = load function pairs
+ */
+ Map<String, String> loadFunctionExtensionTagMap;
+
+ /**
+ * Stores the extension = tags pairs
+ */
+ Map<String, Set<String>> extensionTagsMap;
+
+ public LoadFuncHelper(Configuration conf) throws IOException {
+ this.conf = conf;
+ fileSystem = FileSystem.get(conf);
+
+ loadFunctionExtensionTagMap = new HashMap<String, String>();
+ extensionTagsMap = new HashMap<String, Set<String>>();
+
+ String fileExtensionLoaders = conf.get(FILE_EXTENSION_LOADERS);
+
+ if (fileExtensionLoaders != null) {
+ String[] loaderExtensionPairs = fileExtensionLoaders.split("\\),");
+ for (String loaderExtensionPairStr : loaderExtensionPairs) {
+
+ String[] loaderExtensionPair = loaderExtensionPairStr
+ .split(":");
+ if (loaderExtensionPair.length == 2) {
+ // we have extension:loader assign EMPTY TAG
+ loadFunctionExtensionTagMap.put(
+ loaderExtensionPair[0].trim() + ":",
+ loaderExtensionPair[1].trim());
+ } else if (loaderExtensionPair.length == 3
+ || loaderExtensionPair.length == 4) {
+ // we have extension:pathTag:loader assign TAG
+ String ext = loaderExtensionPair[0].trim();
+ String tag = loaderExtensionPair[1].trim();
+
+ String key = ext + ":" + tag;
+
+ String loadFunc = loaderExtensionPair[2].trim();
+ // support key class names for sequence files
+ if (loaderExtensionPair.length == 4) {
+ // loadFunc here is not loadFunc but the sequence file
+ // key class
+ key += ":" + loadFunc;
+ loadFunc = loaderExtensionPair[3].trim();
+ }
+
+ loadFunctionExtensionTagMap.put(key, loadFunc);
+
+ Set<String> tags = extensionTagsMap.get(ext);
+ if (tags == null) {
+ tags = new TreeSet<String>();
+ extensionTagsMap.put(ext, tags);
+ }
+
+ tags.add(tag);
+
+ } else {
+ throw new FrontendException(
+ "Bad formatted file.extension.loaders string, format is <extension>:<loader>,<extenion><loader>");
+ }
+ }
+ }
+
+ }
+
+ /**
+ *
+ * @return
+ */
+ private static Map<MagicNumber, String> buildMagicNumberExtensionMap() {
+ Map<MagicNumber, String> magicNumberExtensionMap = new HashMap<MagicNumber, String>();
+ magicNumberExtensionMap.put(new MagicNumber(new byte[] { 83, 69, 81 }),
+ "seq");
+ magicNumberExtensionMap.put(
+ new MagicNumber(new byte[] { -119, 76, 90 }), "lzo");
+ magicNumberExtensionMap.put(
+ new MagicNumber(new byte[] { 31, -117, 8 }), "gz");
+ magicNumberExtensionMap.put(
+ new MagicNumber(new byte[] { 66, 90, 104 }), "bz2");
+
+ return magicNumberExtensionMap;
+ }
+
+ /**
+ * If location is a directory the first file found is returned
+ *
+ * @param location
+ * @return
+ * @throws IOException
+ * if no file is found a FrontendException is thrown
+ */
+ public Path determineFirstFile(String location) throws IOException {
+ Path path = new Path(location);
+ FileStatus status = fileSystem.getFileStatus(path);
+
+ if (status.isDir()) {
+ // get the first file.
+ path = getFirstFile(fileSystem, path);
+
+ if (path == null) {
+ throw new FrontendException(path + " has no files");
+ }
+ }
+
+ return path;
+ }
+
+ /**
+ *
+ * If location is a directory the first file found in the directory is used.<br/>
+ * The file extension of the file will be searched against the
+ * file.extension.loaders mappings. If none found null is returned.
+ *
+ * @param location
+ * @return
+ * @throws IOException
+ */
+ public FuncSpec determineFunction(String location) throws IOException {
+ return determineFunction(location, determineFirstFile(location));
+ }
+
+ /**
+ *
+ * The file extension of the file will be searched against the
+ * file.extension.loaders mappings. If none found null is returned.
+ *
+ * @param path
+ * @param location
+ * @return
+ * @throws IOException
+ */
+ public FuncSpec determineFunction(String location, Path path)
+ throws IOException {
+
+ String fileName = path.getName();
+
+ FuncSpec funcSpec = getLoadPerExtension(fileName, path);
+
+ if (funcSpec == null) {
+ // look for loaders by the content definition
+
+ funcSpec = getFuncSpecFromContent(path);
+
+ }
+
+ return funcSpec;
+ }
+
+ /**
+ * Tries to identify the extension and there by the loader from the content
+ * type.
+ *
+ * @param path
+ * @return
+ * @throws IOException
+ */
+ private FuncSpec getFuncSpecFromContent(Path path) throws IOException {
+ // get the first three bytes from the file.
+ FSDataInputStream dataIn = null;
+ byte[] magic = new byte[3];
+ int read = -1;
+
+ try {
+ dataIn = fileSystem.open(path, 3);
+ read = dataIn.read(magic);
+ } finally {
+ dataIn.close();
+ }
+
+ FuncSpec funcSpec = null;
+ String extensionMapping = magicNumberExtensionMap.get(new MagicNumber(
+ magic));
+
+ if (read < magic.length || extensionMapping == null) {
+ // assume plain text
+ funcSpec = new FuncSpec("PigStorage()");
+ } else {
+ // an extension mapping was found. i.e. this is a GZ, BZ2, LZO or
+ // SEQ file
+
+ String applicableTag = getApplicableTag(extensionMapping, path);
+ String loadFuncDefinition = null;
+
+ if (extensionMapping.equals("seq")) {
+ // if this is a sequence file we load the key class also
+ loadFuncDefinition = loadFunctionExtensionTagMap
+ .get(extensionMapping + ":" + applicableTag + ":"
+ + getSequenceFileKeyClass(path));
+
+ }
+
+ // we do this also for sequence file because a sequence file might
+ // have a sequeyceFileKey associated or not in the extension mapping
+ // given both cases if the key class is not found above in the
+ // mapping, the default sequence file loader needs to be used as per
+ // the extension mapping.
+ if (loadFuncDefinition == null) {
+ // use only extension and tag filtering
+ loadFuncDefinition = loadFunctionExtensionTagMap
+ .get(extensionMapping + ":" + applicableTag);
+
+ }
+
+ if (loadFuncDefinition == null) {
+ // if still null thrown an error
+ throw new RuntimeException("Cannot find loader for " + path
+ + " extension mapping " + extensionMapping);
+ }
+
+ funcSpec = new FuncSpec(loadFuncDefinition);
+ }
+
+ return funcSpec;
+ }
+
+ /**
+ * Open a SequenceFile.Reader instance and return the keyClassName
+ *
+ * @param path
+ * @return
+ * @throws IOException
+ */
+ private String getSequenceFileKeyClass(Path path) throws IOException {
+
+ String keyClassName = null;
+ SequenceFile.Reader reader = new SequenceFile.Reader(fileSystem, path,
+ conf);
+
+ try {
+
+ keyClassName = reader.getKeyClassName();
+ int index = keyClassName.indexOf("$");
+ if (index > 0) {
+ keyClassName = keyClassName.substring(0, index);
+ }
+
+ } finally {
+ reader.close();
+ }
+
+ return keyClassName;
+ }
+
+ /**
+ * Search for the correct loader based on the extension and tags mappings.
+ *
+ * @param fileName
+ * @param path
+ * @return
+ */
+ private FuncSpec getLoadPerExtension(String fileName, Path path) {
+
+ String extension = null;
+ String applicableTag = null;
+ String loadFuncDefinition = null;
+ FuncSpec funcSpec = null;
+
+ // NOTE: the inverse logic !( a == null && b == null) is not used
+ // because we want all statements to be cheked as long as they are not
+ // null.
+ while (fileName != null && (extension = getExtension(fileName)) != null
+ && (applicableTag = getApplicableTag(extension, path)) != null) {
+
+ if ((loadFuncDefinition = loadFunctionExtensionTagMap.get(extension
+ + ":" + applicableTag)) != null) {
+ // create the LoadFunc
+ funcSpec = new FuncSpec(loadFuncDefinition);
+ break;
+ }
+
+ fileName = cutExtension(fileName);
+ }
+
+ return funcSpec;
+ }
+
+ /**
+ * Searches in the path for the first occurrence of the tags associated with
+ * the extension.<br/>
+ * If this extension has no tags an empty string is returned.<br/>
+ * If it has tags and no tag is found in the path null is returned.<br/>
+ *
+ * @param extension
+ * @param path
+ * @return
+ */
+ private String getApplicableTag(String extension, Path path) {
+
+ Set<String> tags = extensionTagsMap.get(extension);
+ String applicableTag = null;
+
+ if (tags != null) {
+
+ String fullPathName = path.toUri().toString();
+
+ for (String tag : tags) {
+ if (fullPathName.contains(tag)) {
+ applicableTag = tag;
+ break;
+ }
+ }
+
+ } else {
+ applicableTag = "";
+ }
+
+ return applicableTag;
+ }
+
+ /**
+ * @param fileName
+ * @return String return the file name without the last extension e.g.
+ * file.rc.gz will return file.rc
+ */
+ private static String cutExtension(String fileName) {
+ String name = null;
+
+ int index = fileName.lastIndexOf('.');
+ if (index > 0 && index < fileName.length()) {
+ name = fileName.substring(0, index);
+ }
+
+ return name;
+ }
+
+ /**
+ *
+ * @param fileName
+ * @return String return the last file name extension e.g. file.rc.gz will
+ * return gz
+ */
+ private static String getExtension(String fileName) {
+
+ String extension = null;
+
+ int index = fileName.lastIndexOf('.');
+ int pos = index + 1;
+ if (index > 0 && pos < fileName.length()) {
+ extension = fileName.substring(pos, fileName.length());
+ }
+
+ return extension;
+ }
+
+ /**
+ * Looks for and returns the first file it can find.
+ *
+ * @return Path null is no file was found
+ * @throws IOException
+ */
+ private static Path getFirstFile(FileSystem fileSystem, Path path)
+ throws IOException {
+ Path currentPath = path;
+ Path file = null;
+
+ FileStatus[] paths = fileSystem.listStatus(currentPath);
+
+ for (FileStatus subPathStatus : paths) {
+
+ currentPath = subPathStatus.getPath();
+
+ // if hidden file skip.
+ if (currentPath.getName().startsWith(".")
+ || currentPath.getName().startsWith("_")) {
+ continue;
+ }
+
+ if (subPathStatus.isDir()) {
+ file = getFirstFile(fileSystem, currentPath);
+ } else {
+ // first file found return.
+ file = currentPath;
+ break;
+ }
+ }
+
+ return file;
+ }
+
+ static class MagicNumber {
+
+ byte[] magic;
+
+ public MagicNumber(byte[] magic) {
+ super();
+ this.magic = magic;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + Arrays.hashCode(magic);
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ MagicNumber other = (MagicNumber) obj;
+ if (!Arrays.equals(magic, other.magic))
+ return false;
+ return true;
+ }
+
+ }
+}
Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAllLoader.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAllLoader.java?rev=1035726&view=auto
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAllLoader.java (added)
+++ pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAllLoader.java Tue Nov 16 18:23:23 2010
@@ -0,0 +1,694 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.pig.piggybank.test.storage;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.RCFile;
+import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.pig.ExecType;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.PigServer;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.piggybank.storage.AllLoader;
+import org.apache.pig.piggybank.storage.HiveColumnarLoader;
+import org.apache.pig.piggybank.storage.allloader.LoadFuncHelper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mortbay.log.Log;
+
+public class TestAllLoader extends TestCase {
+
+ enum TYPE {
+ HIVERC(".rc", new HiveRCFileTestWriter()), GZIP_PLAIN(".gz",
+ new GzipFileTestWriter());
+
+ String extension;
+ FileTestWriter writer;
+
+ TYPE(String extension, FileTestWriter writer) {
+ this.extension = extension;
+ this.writer = writer;
+ }
+
+ }
+
+ String colSeparator = ",";
+
+ /**
+ * All test files will contain this amount of records
+ */
+ int fileRecords = 100;
+ /**
+ * All files will contain this amount of columns in each row
+ */
+ int fileColumns = 2;
+
+ final TYPE fileTypes[] = new TYPE[] { TYPE.GZIP_PLAIN, TYPE.HIVERC };
+
+ final String extensionLoaders = "gz:org.apache.pig.builtin.PigStorage('"
+ + colSeparator + "'), rc:"
+ + HiveColumnarLoader.class.getCanonicalName()
+ + "('v1 float,v2 float')";
+
+ File baseDir;
+
+ File simpleDir;
+
+ // --------------- For date partitioning based on daydate= (this will work
+ // for year=, day= etc
+ File datePartitionDir;
+ final String[] datePartitions = new String[] { "daydate=2010-11-01",
+ "daydate=2010-11-02", "daydate=2010-11-03", "daydate=2010-11-04",
+ "daydate=2010-11-05" };
+
+ // -------------- Logic partitioning that does not involve dates
+ File logicPartitionDir;
+ final String[] logicPartitions = new String[] { "block=1", "block=2",
+ "block=3" };
+
+ // -------------- Logic Tagged partitioning that is /type1/block=1
+ // /type2/block2
+ // This is needed because the Schema for each path might be different and we
+ // want to use a different loader based on if we are
+ // looking in type1 or in type2 see taggedExtensionLoaders
+ File taggedLogicPartitionDir;
+ final String taggedExtensionLoaders = "gz:org.apache.pig.builtin.PigStorage('"
+ + colSeparator
+ + "'), rc:type1:"
+ + HiveColumnarLoader.class.getCanonicalName()
+ + "('v1 float,v2 float'), rc:type2:"
+ + HiveColumnarLoader.class.getCanonicalName() + "('v1 float')";
+
+ final String[] tags = new String[] { "type1", "type2" };
+ final String[] taggedSchemas = new String[] { "(p: float, q:float)",
+ "(p:float)" };
+
+ // -------------- Test Load Files by Content
+ File filesByContentDir;
+ final String contentLoaders = "gz:org.apache.pig.builtin.PigStorage('"
+ + colSeparator
+ + "'), rc:"
+ + HiveColumnarLoader.class.getCanonicalName()
+ + "('v1 float,v2 float'), seq::org.apache.hadoop.hive.ql.io.RCFile:"
+ + HiveColumnarLoader.class.getCanonicalName()
+ + "('v1 float,v2 float')";
+
+ Properties configuration;
+
+ final String allLoaderName = AllLoader.class.getCanonicalName();
+ final String allLoaderFuncSpec = allLoaderName + "()";
+
+ PigServer server;
+
+ /**
+ * Test that we can load files with the correct loaders as per file content.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testFilesByContentDir() throws IOException {
+
+ server.shutdown();
+
+ configuration.setProperty(LoadFuncHelper.FILE_EXTENSION_LOADERS,
+ contentLoaders);
+
+ server = new PigServer(ExecType.LOCAL, configuration);
+
+ server.setBatchOn();
+ server.registerFunction(allLoaderName, new FuncSpec(allLoaderFuncSpec));
+
+ server.registerQuery("a = LOAD '" + filesByContentDir.getAbsolutePath()
+ + "' using " + allLoaderFuncSpec + " as (p:float, q:float);");
+
+ readRecordsFromLoader(server, "a", fileRecords);
+
+ }
+
+ /**
+ * Test that we can read a tagged logic partitioned directory
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testTaggedLogicPartitionDir() throws IOException {
+
+ server.shutdown();
+
+ configuration.setProperty(LoadFuncHelper.FILE_EXTENSION_LOADERS,
+ taggedExtensionLoaders);
+
+ server = new PigServer(ExecType.LOCAL, configuration);
+
+ server.setBatchOn();
+ server.registerFunction(allLoaderName, new FuncSpec(allLoaderFuncSpec));
+
+ int i = 0;
+ for (String tag : tags) {
+
+ String schema = taggedSchemas[i++];
+
+ server.registerQuery(tag + " = LOAD '"
+ + taggedLogicPartitionDir.getAbsolutePath() + "/" + tag
+ + "' using " + allLoaderFuncSpec + " as " + schema + ";");
+
+ readRecordsFromLoader(server, tag, fileRecords
+ * logicPartitions.length * fileTypes.length);
+ }
+
+ }
+
+ /**
+ * Test that we can filter by a logic partition based on a range, in this
+ * case block<=2
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testLogicPartitionFilter() throws IOException {
+
+ server.registerQuery("a = LOAD '" + logicPartitionDir.getAbsolutePath()
+ + "' using " + allLoaderName + "('block<=2')"
+ + " as (q:float, p:float);");
+
+ server.registerQuery("r = FOREACH a GENERATE q, p;");
+
+ Iterator<Tuple> it = server.openIterator("r");
+
+ int count = 0;
+
+ while (it.hasNext()) {
+ count++;
+ Tuple t = it.next();
+ // System.out.println(count + " : " + t.toDelimitedString(","));
+ assertEquals(2, t.size());
+
+ }
+
+ // only 2 partitions are used in the query block=3 is filtered out.
+ assertEquals(fileRecords * 2 * fileTypes.length, count);
+
+ }
+
+ /**
+ * Test that we can extract only the partition column
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testLogicPartitionPartitionColumnExtract() throws IOException {
+
+ server.registerQuery("a = LOAD '" + logicPartitionDir.getAbsolutePath()
+ + "' using " + allLoaderFuncSpec
+ + " as (q:float, p:float, block:chararray);");
+
+ server.registerQuery("r = foreach a generate block;");
+
+ Iterator<Tuple> it = server.openIterator("r");
+
+ int count = 0;
+ Map<String, AtomicInteger> partitionCount = new HashMap<String, AtomicInteger>();
+
+ while (it.hasNext()) {
+ count++;
+ Tuple t = it.next();
+
+ assertEquals(1, t.size());
+ String key = t.get(0).toString();
+
+ AtomicInteger ati = partitionCount.get(key);
+ if (ati == null) {
+ ati = new AtomicInteger(1);
+ partitionCount.put(key, ati);
+ } else {
+ ati.incrementAndGet();
+ }
+
+ }
+
+ // test that all partitions where read
+ for (AtomicInteger ati : partitionCount.values()) {
+ assertEquals(fileRecords * fileTypes.length, ati.get());
+ }
+
+ assertEquals(fileRecords * logicPartitions.length * fileTypes.length,
+ count);
+
+ }
+
+ /**
+ * Test that we can read a logic partitioned directory
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testLogicPartitionDir() throws IOException {
+
+ server.registerQuery("a = LOAD '" + logicPartitionDir.getAbsolutePath()
+ + "' using " + allLoaderFuncSpec + " as (q:float, p:float);");
+
+ readRecordsFromLoader(server, "a", fileRecords * logicPartitions.length
+ * fileTypes.length);
+
+ }
+
+ /**
+ * Test that we can filter a date partitioned directory by a date range
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testDateParitionFilterWithAsSchema() throws IOException {
+
+ server.registerQuery("a = LOAD '"
+ + datePartitionDir.getAbsolutePath()
+ + "' using "
+ + allLoaderName
+ + "('daydate >= \"2010-11-02\" and daydate <= \"2010-11-04\"') AS (q:float, p:float, daydate:chararray); ");
+
+ server.registerQuery("r = FOREACH a GENERATE $0;");
+
+ Iterator<Tuple> it = server.openIterator("r");
+
+ int count = 0;
+
+ while (it.hasNext()) {
+ count++;
+ Tuple t = it.next();
+
+ assertEquals(1, t.size());
+
+ }
+ // we filter out 2 date partitions using only 3
+ readRecordsFromLoader(server, "a", fileRecords * 3 * fileTypes.length);
+ }
+
+ /**
+ * Test that we can filter a date partitioned directory by a date range
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testDateParitionFilterWithoutSchema() throws IOException {
+
+ server.registerQuery("a = LOAD '"
+ + datePartitionDir.getAbsolutePath()
+ + "' using "
+ + allLoaderName
+ + "('daydate >= \"2010-11-02\" and daydate <= \"2010-11-04\"'); ");
+
+ server.registerQuery("r = FOREACH a GENERATE $0;");
+
+ Iterator<Tuple> it = server.openIterator("r");
+
+ int count = 0;
+
+ while (it.hasNext()) {
+ count++;
+ Tuple t = it.next();
+
+ float f = Float.valueOf(t.get(0).toString());
+ assertTrue(f < 1L);
+ assertEquals(1, t.size());
+
+ }
+ // we filter out 2 date partitions using only 3
+ readRecordsFromLoader(server, "a", fileRecords * 3 * fileTypes.length);
+ }
+
+ /**
+ * Test that we can read a date partitioned directory
+ *
+ * @throws IOException
+ * @throws ParseException
+ */
+ @Test
+ public void testDateParitionDir() throws IOException, ParseException {
+
+ server.registerQuery("a = LOAD '" + datePartitionDir.getAbsolutePath()
+ + "' using " + allLoaderFuncSpec
+ + " as (q:float, p:float, daydate:chararray);");
+
+ server.registerQuery("r = FOREACH a GENERATE daydate;");
+
+ Iterator<Tuple> it = server.openIterator("r");
+
+ DateFormat dateformat = new SimpleDateFormat("yyyy-MM-dd");
+
+ Calendar cal = Calendar.getInstance();
+
+ while (it.hasNext()) {
+ Date date = dateformat.parse(it.next().get(0).toString());
+ cal.setTime(date);
+ assertEquals(2010, cal.get(Calendar.YEAR));
+ assertEquals(10, cal.get(Calendar.MONTH)); // month starts at 0 so
+ // November 11 is 10
+
+ }
+
+ readRecordsFromLoader(server, "a", fileRecords * datePartitions.length
+ * fileTypes.length);
+ }
+
+ /**
+ * Test that we can read from a simple directory
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testSimpleDir() throws IOException {
+
+ server.registerQuery("a = LOAD '" + simpleDir.getAbsolutePath()
+ + "' using " + allLoaderFuncSpec + " as (q:float, p:float);");
+
+ readRecordsFromLoader(server, "a", fileRecords * fileTypes.length);
+
+ }
+
+ /**
+ * Validates that the loadAlias can read the correct amount of records
+ *
+ * @param server
+ * @param loadAlias
+ * @throws IOException
+ */
+ private void readRecordsFromLoader(PigServer server, String loadAlias,
+ int totalRowCount) throws IOException {
+
+ Iterator<Tuple> result = server.openIterator(loadAlias);
+ int count = 0;
+
+ while ((result.next()) != null) {
+ count++;
+ }
+
+ Log.info("Validating expected: " + totalRowCount + " against " + count);
+ assertEquals(totalRowCount, count);
+ }
+
+ @Before
+ public void setUp() throws Exception {
+
+ Configuration hadoopConf = new Configuration();
+ FileSystem.setDefaultUri(hadoopConf,
+ LocalFileSystem.getDefaultUri(hadoopConf));
+
+ if (baseDir == null) {
+ configuration = new Properties();
+ configuration.setProperty(LoadFuncHelper.FILE_EXTENSION_LOADERS,
+ extensionLoaders);
+
+ baseDir = new File("build/test/testAllLoader");
+ if (baseDir.exists()) {
+ FileUtil.fullyDelete(baseDir);
+ }
+
+ assertTrue(baseDir.mkdirs());
+
+ createSimpleDir();
+ createDatePartitionDir();
+ createLogicPartitionDir();
+ createTaggedLogicPartitionDir();
+ createFileByContentDir();
+
+ server = new PigServer(ExecType.LOCAL, configuration);
+
+ server.setBatchOn();
+ server.registerFunction(allLoaderName, new FuncSpec(
+ allLoaderFuncSpec));
+ }
+ }
+
+ /**
+ * Write out all files without there extensions
+ *
+ * @throws IOException
+ */
+ private void createFileByContentDir() throws IOException {
+
+ filesByContentDir = new File(baseDir, "filesByContentDir");
+ assertTrue(filesByContentDir.mkdirs());
+
+ // for each type create the files without its extension
+
+ File uniqueFile = new File(filesByContentDir, ""
+ + System.currentTimeMillis());
+ TYPE.HIVERC.writer.writeTestData(uniqueFile, fileRecords, fileColumns,
+ colSeparator);
+
+ }
+
+ /**
+ * Write out the tagged logical partitioning directories e.g. type=1/block=1
+ *
+ * @throws IOException
+ */
+ private void createTaggedLogicPartitionDir() throws IOException {
+ taggedLogicPartitionDir = new File(baseDir, "taggedLogicPartitionDir");
+ assertTrue(taggedLogicPartitionDir.mkdirs());
+
+ // for each tag create a directory
+ for (String tag : tags) {
+ // for each logic partition create a directory
+ File tagDir = new File(taggedLogicPartitionDir, tag);
+ for (String partition : logicPartitions) {
+ File logicPartition = new File(tagDir, partition);
+
+ assertTrue(logicPartition.mkdirs());
+
+ for (TYPE fileType : fileTypes) {
+ writeFile(logicPartition, fileType);
+ }
+ }
+
+ }
+ }
+
+ /**
+ * Write out logical partitioned directories with one file per fileType:TYPE
+ *
+ * @throws IOException
+ */
+ private void createLogicPartitionDir() throws IOException {
+
+ logicPartitionDir = new File(baseDir, "logicPartitionDir");
+ assertTrue(logicPartitionDir.mkdirs());
+
+ // for each logic partition create a directory
+ for (String partition : logicPartitions) {
+ File logicPartition = new File(logicPartitionDir, partition);
+
+ assertTrue(logicPartition.mkdirs());
+
+ for (TYPE fileType : fileTypes) {
+ writeFile(logicPartition, fileType);
+ }
+ }
+
+ }
+
+ /**
+ * Write out date partitioned directories with one file per fileType:TYPE
+ *
+ * @throws IOException
+ */
+ private void createDatePartitionDir() throws IOException {
+
+ datePartitionDir = new File(baseDir, "dateParitionDir");
+ assertTrue(datePartitionDir.mkdirs());
+
+ // for each date partition create a directory
+ for (String partition : datePartitions) {
+ File datePartition = new File(datePartitionDir, partition);
+
+ assertTrue(datePartition.mkdirs());
+
+ for (TYPE fileType : fileTypes) {
+ writeFile(datePartition, fileType);
+ }
+ }
+
+ }
+
+ /**
+ * Write out a simple directory with one file per fileType:TYPE
+ *
+ * @throws IOException
+ */
+ private void createSimpleDir() throws IOException {
+
+ simpleDir = new File(baseDir, "simpleDir");
+ assertTrue(simpleDir.mkdirs());
+
+ for (TYPE fileType : fileTypes) {
+ writeFile(simpleDir, fileType);
+ }
+
+ }
+
+ /**
+ * Create a unique file name with format
+ * [currentTimeInMillis].[type.extension]
+ *
+ * @param dir
+ * @param type
+ * @throws IOException
+ */
+ private void writeFile(File dir, TYPE type) throws IOException {
+ File uniqueFile = new File(dir, System.currentTimeMillis()
+ + type.extension);
+ type.writer.writeTestData(uniqueFile, fileRecords, fileColumns,
+ colSeparator);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+
+ server.shutdown();
+
+ FileUtil.fullyDelete(baseDir);
+ baseDir = null;
+
+ }
+
+ /**
+ * Simple interface to help with writting test data.
+ *
+ */
+ private static interface FileTestWriter {
+ void writeTestData(File file, int recordCounts, int columnCount,
+ String colSeparator) throws IOException;
+ }
+
+ /**
+ * Write Gzip Content
+ *
+ */
+ private static class GzipFileTestWriter implements FileTestWriter {
+
+ @Override
+ public void writeTestData(File file, int recordCounts, int columnCount,
+ String colSeparator) throws IOException {
+
+ // write random test data
+ GzipCodec gzipCodec = new GzipCodec();
+ CompressionOutputStream out = gzipCodec
+ .createOutputStream(new FileOutputStream(file));
+ BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(
+ out));
+
+ try {
+
+ for (int r = 0; r < recordCounts; r++) {
+ // foreach row write n columns
+
+ for (int c = 0; c < columnCount; c++) {
+
+ if (c != 0) {
+ writer.append(colSeparator);
+ }
+
+ writer.append(String.valueOf(Math.random()));
+
+ }
+ writer.append("\n");
+
+ }
+
+ } finally {
+ writer.close();
+ out.close();
+ }
+
+ }
+
+ }
+
+ private static class HiveRCFileTestWriter implements FileTestWriter {
+
+ @Override
+ public void writeTestData(File file, int recordCounts, int columnCount,
+ String colSeparator) throws IOException {
+
+ // write random test data
+
+ Configuration conf = new Configuration();
+ FileSystem fs = FileSystem.getLocal(conf);
+
+ RCFileOutputFormat.setColumnNumber(conf, columnCount);
+ RCFile.Writer writer = new RCFile.Writer(fs, conf, new Path(
+ file.getAbsolutePath()));
+
+ BytesRefArrayWritable bytes = new BytesRefArrayWritable(columnCount);
+
+ for (int c = 0; c < columnCount; c++) {
+ bytes.set(c, new BytesRefWritable());
+ }
+
+ try {
+
+ for (int r = 0; r < recordCounts; r++) {
+ // foreach row write n columns
+
+ for (int c = 0; c < columnCount; c++) {
+
+ byte[] stringbytes = String.valueOf(Math.random())
+ .getBytes();
+ bytes.get(c).set(stringbytes, 0, stringbytes.length);
+
+ }
+
+ writer.append(bytes);
+
+ }
+
+ } finally {
+ writer.close();
+ }
+
+ }
+
+ }
+
+}
Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestLoadFuncHelper.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestLoadFuncHelper.java?rev=1035726&view=auto
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestLoadFuncHelper.java (added)
+++ pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestLoadFuncHelper.java Tue Nov 16 18:23:23 2010
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.pig.piggybank.test.storage;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.pig.ExecType;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.PigServer;
+import org.apache.pig.piggybank.storage.allloader.LoadFuncHelper;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ *
+ * Tests that the LoadFunchelper's public method returns the values required.
+ */
+public class TestLoadFuncHelper extends TestCase {
+
+ private static final String LOADER = "org.apache.pig.builtin.PigStorage()";
+ private static final String extensionLoaders = "txt:" + LOADER;
+ Configuration configuration;
+ LoadFuncHelper helper = null;
+
+ File baseDir;
+
+ File testFile;
+
+ @Test
+ public void testDetermineFunctionSingleArg() throws IOException {
+
+ FuncSpec funcSpec = helper.determineFunction(baseDir.getAbsolutePath());
+
+ assertNotNull(funcSpec);
+ assertEquals(LOADER, funcSpec.toString() + "()");
+
+ }
+
+ @Test
+ public void testDetermineFunction() throws IOException {
+
+ Path firstFile = helper.determineFirstFile(baseDir.getAbsolutePath());
+ FuncSpec funcSpec = helper.determineFunction(baseDir.getAbsolutePath(),
+ firstFile);
+
+ assertNotNull(funcSpec);
+ assertEquals(LOADER, funcSpec.toString() + "()");
+
+ }
+
+ @Test
+ public void testDetermineFirstFile() throws IOException {
+
+ Path path = helper.determineFirstFile(baseDir.getAbsolutePath());
+
+ assertNotNull(path);
+ assertEquals(testFile.getAbsolutePath(), path.toUri().toURL().getFile());
+
+ }
+
+ @Before
+ public void setUp() throws Exception {
+
+ if (baseDir == null) {
+
+ // we need this here for some strange reason while running ant test
+ // the FileSystem.get call in the LoadFuncHelper will try and
+ // connect to localhost??
+ PigServer pig = new PigServer(ExecType.LOCAL);
+
+ configuration = new Configuration(false);
+
+ configuration.set(LoadFuncHelper.FILE_EXTENSION_LOADERS,
+ extensionLoaders);
+
+ helper = new LoadFuncHelper(configuration);
+
+ baseDir = new File("build/test/testLoadFuncHelper");
+ if (baseDir.exists()) {
+ FileUtil.fullyDelete(baseDir);
+ }
+
+ assertTrue(baseDir.mkdirs());
+
+ testFile = new File(baseDir, "testFile.txt");
+ FileWriter writer = new FileWriter(testFile);
+ try {
+ for (int i = 0; i < 100; i++) {
+ writer.append("test test\n");
+ }
+ } finally {
+ writer.close();
+ }
+
+ }
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ FileUtil.fullyDelete(baseDir);
+ baseDir = null;
+ }
+
+}