You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2010/11/16 19:23:23 UTC

svn commit: r1035726 - in /pig/trunk/contrib: ./ piggybank/java/src/main/java/org/apache/pig/piggybank/storage/ piggybank/java/src/main/java/org/apache/pig/piggybank/storage/allloader/ piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/

Author: gates
Date: Tue Nov 16 18:23:23 2010
New Revision: 1035726

URL: http://svn.apache.org/viewvc?rev=1035726&view=rev
Log:
PIG-1722 PiggyBank AllLoader - Load multiple file formats in one load statement.

Added:
    pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/AllLoader.java
    pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/allloader/
    pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/allloader/LoadFuncHelper.java
    pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAllLoader.java
    pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestLoadFuncHelper.java
Modified:
    pig/trunk/contrib/CHANGES.txt

Modified: pig/trunk/contrib/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/CHANGES.txt?rev=1035726&r1=1035725&r2=1035726&view=diff
==============================================================================
--- pig/trunk/contrib/CHANGES.txt (original)
+++ pig/trunk/contrib/CHANGES.txt Tue Nov 16 18:23:23 2010
@@ -24,6 +24,9 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-1722 PiggyBank AllLoader - Load multiple file formats in one load
+statement (gerritjw via gates)
+
 PIG-1502 Remove Owl as a contrib project (gates)
 
 PIG-1386 UDF to extend functionalities of MaxTupleBy1stField (hcbusy via olgan)

Added: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/AllLoader.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/AllLoader.java?rev=1035726&view=auto
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/AllLoader.java (added)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/AllLoader.java Tue Nov 16 18:23:23 2010
@@ -0,0 +1,730 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.pig.piggybank.storage;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.log4j.Logger;
+import org.apache.pig.Expression;
+import org.apache.pig.FileInputLoadFunc;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.LoadCaster;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.LoadMetadata;
+import org.apache.pig.LoadPushDown;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.ResourceStatistics;
+import org.apache.pig.StoreMetadata;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.builtin.Utf8StorageConverter;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.piggybank.storage.allloader.LoadFuncHelper;
+import org.apache.pig.piggybank.storage.partition.PathPartitionHelper;
+
+/**
+ * The AllLoader provides the ability to point pig at a folder that contains
+ * files in multiple formats e.g. PlainText, Gz, Bz, Lzo, HiveRC etc and have
+ * the LoadFunc(s) automatically selected based on the file extension. <br/>
+ * <b>How this works:<b/><br/>
+ * The file extensions are mapped in the pig.properties via the property
+ * file.extension.loaders.
+ * 
+ * <p/>
+ * <b>file.extension.loaders format</b>
+ * <ul>
+ * <li>[file extension]:[loader func spec]</li>
+ * <li>[file-extension]:[optional path tag]:[loader func spec]</li>
+ * <li>[file-extension]:[optional path tag]:[sequence file key value writer
+ * class name]:[loader func spec]</li>
+ * </ul>
+ * 
+ * <p/>
+ * The file.extension.loaders property associate pig loaders with file
+ * extensions, if a file does not have an extension the AllLoader will look at
+ * the first three bytes of a file and try to guess its format bassed on:
+ * <ul>
+ * <li>[ -119, 76, 90 ] = lzo</li>
+ * <li>[ 31, -117, 8 ] = gz</li>
+ * <li>[ 66, 90, 104 ] = bz2</li>
+ * <li>[ 83, 69, 81 ] = seq</li>
+ * </ul>
+ * <br/>
+ * The loader associated with that extension will then be used.
+ * <p/>
+ * 
+ * <b>Path partitioning</b> The AllLoader supports hive style path partitioning
+ * e.g. /log/type1/daydate=2010-11-01<br/>
+ * "daydate" will be considered a partition key and filters can be written
+ * against this.<br/>
+ * Note that the filter should go into the AllLoader contructor e.g.<br/>
+ * a = LOAD 'input' using AllLoader('daydate<\"2010-11-01\"')<br/>
+ * 
+ * <b>Path tags</b> AllLoader supports configuring different loaders for the
+ * same extension based on there file path.<br/>
+ * E.g.<br/>
+ * We have the paths /log/type1, /log/type2<br/>
+ * For each of these directories we'd like to use different loaders.<br/>
+ * So we use setup our loaders:<br/>
+ * file.extension.loaders:gz:type1:MyType1Loader, gz:type2:MyType2Loader<br/>
+ * 
+ * 
+ * <p/>
+ * <b>Sequence files<b/> Sequence files also support using the Path tags for
+ * loader selection but has an extra configuration option that relates to the
+ * Key Class used to write the Sequence file.<br/>
+ * E.g. for HiveRC this value is: org.apache.hadoop.hive.ql.io.RCFile so we can
+ * setup our sequence file formatting:<br/>
+ * file.extension.loaders:seq::org.apache.hadoop.hive.ql.io.RCFile:
+ * MyHiveRCLoader, seq::DefaultSequenceFileLoader<br/>
+ * 
+ * <p/>
+ * <b>Schema</b> The JsoneMetadata schema loader is supported and the schema
+ * will be loaded using this loader.<br/>
+ * In case this fails, the schema can be loaded using the default schema
+ * provided.
+ * 
+ */
+public class AllLoader extends FileInputLoadFunc implements LoadMetadata,
+        StoreMetadata, LoadPushDown {
+
+    private static final Logger LOG = Logger.getLogger(AllLoader.class);
+
+    private static final String PROJECTION_ID = AllLoader.class.getName()
+            + ".projection";
+
+    transient LoadFunc childLoadFunc;
+    transient boolean supportPushDownProjection = false;
+    transient RequiredFieldList requiredFieldList;
+    transient SortedSet<Integer> requiredFieldHashSet;
+
+    transient TupleFactory tupleFactory = TupleFactory.getInstance();
+    transient ResourceSchema schema;
+
+    String signature;
+
+    /**
+     * Implements the logic for searching partition keys and applying parition
+     * filtering
+     */
+    transient PathPartitionHelper pathPartitionerHelper = new PathPartitionHelper();
+    transient Map<String, String> currentPathPartitionKeyMap;
+    transient String[] partitionColumns;
+
+    transient JsonMetadata jsonMetadata;
+    transient boolean partitionKeysSet = false;
+
+    LoadFuncHelper loadFuncHelper = null;
+
+    transient Configuration conf;
+    transient Path currentPath;
+
+    String constructorPassedPartitionFilter;
+
+    public AllLoader() {
+        jsonMetadata = new JsonMetadata();
+    }
+
+    public AllLoader(String partitionFilter) {
+        this();
+        LOG.debug("PartitionFilter: " + partitionFilter.toString());
+
+        constructorPassedPartitionFilter = partitionFilter;
+
+    }
+
+    @Override
+    public void setLocation(String location, Job job) throws IOException {
+        FileInputFormat.setInputPaths(job, location);
+        // called on the front end
+        conf = job.getConfiguration();
+        loadFuncHelper = new LoadFuncHelper(conf);
+
+        if (constructorPassedPartitionFilter != null) {
+
+            pathPartitionerHelper.setPartitionFilterExpression(
+                    constructorPassedPartitionFilter, AllLoader.class,
+                    signature);
+
+        }
+
+        getPartitionKeys(location, job);
+    }
+
+    @Override
+    public LoadCaster getLoadCaster() throws IOException {
+        return new Utf8StorageConverter();
+    }
+
+    @Override
+    public AllLoaderInputFormat getInputFormat() throws IOException {
+        // this plugs the AllLoaderInputFormat into the system, which in turn
+        // will plug in the AllRecordReader
+        // the AllRecordReader will select and create the correct LoadFunc
+        return new AllLoaderInputFormat(signature);
+    }
+
+    @Override
+    public void prepareToRead(
+            @SuppressWarnings("rawtypes") RecordReader reader, PigSplit split)
+            throws IOException {
+
+        AllReader allReader = (AllReader) reader;
+
+        if (currentPath == null || !(currentPath.equals(allReader.path))) {
+            currentPathPartitionKeyMap = (partitionColumns == null) ? null
+                    : pathPartitionerHelper
+                            .getPathPartitionKeyValues(allReader.path
+                                    .toString());
+            currentPath = allReader.path;
+        }
+
+        childLoadFunc = allReader.prepareLoadFuncForReading(split);
+
+        String projectProperty = getUDFContext().getProperty(PROJECTION_ID);
+
+        if (projectProperty != null) {
+
+            // load the required field list from the current UDF context
+            ByteArrayInputStream input = new ByteArrayInputStream(
+                    Base64.decodeBase64(projectProperty.getBytes("UTF-8")));
+
+            ObjectInputStream objInput = new ObjectInputStream(input);
+
+            try {
+                requiredFieldList = (RequiredFieldList) objInput.readObject();
+            } catch (ClassNotFoundException e) {
+                throw new FrontendException(e.toString(), e);
+            } finally {
+                IOUtils.closeStream(objInput);
+            }
+
+            if (childLoadFunc.getClass().isAssignableFrom(LoadPushDown.class)) {
+                supportPushDownProjection = true;
+                ((LoadPushDown) childLoadFunc)
+                        .pushProjection(requiredFieldList);
+            } else {
+                if (requiredFieldList != null) {
+                    requiredFieldHashSet = new TreeSet<Integer>();
+                    for (RequiredField requiredField : requiredFieldList
+                            .getFields()) {
+                        requiredFieldHashSet.add(requiredField.getIndex());
+                    }
+                }
+            }
+
+        }
+
+    }
+
+    @Override
+    public Tuple getNext() throws IOException {
+        // delegate work to the child load func selected based on the file type
+        // and other criteria
+        // We do support PushDown Projection if the LoadFunc does not so
+        // in this method we need to look at the childLoadFunc flag
+        // (supportPushDownProjection )
+        // if true we use the getNext method as is, if not we remove the fields
+        // not required in the spushDownProjection.
+
+        Tuple tuple = null;
+
+        if (supportPushDownProjection) {
+            tuple = childLoadFunc.getNext();
+        } else if ((tuple = childLoadFunc.getNext()) != null) {
+            // ----- If the function does not support projection we do it here
+
+            if (requiredFieldHashSet != null) {
+
+                Tuple projectedTuple = tupleFactory
+                        .newTuple(requiredFieldHashSet.size());
+                int i = 0;
+                int tupleSize = tuple.size();
+
+                for (int index : requiredFieldHashSet) {
+                    if (index < tupleSize) {
+                        // add the tuple columns
+                        projectedTuple.set(i++, tuple.get(index));
+                    } else {
+                        // add the partition columns
+                        projectedTuple.set(i++, currentPathPartitionKeyMap
+                                .get(partitionColumns[index - tupleSize]));
+                    }
+                }
+
+                tuple = projectedTuple;
+            }
+
+        }
+
+        return tuple;
+    }
+
+    @Override
+    public List<OperatorSet> getFeatures() {
+        return Arrays.asList(LoadPushDown.OperatorSet.PROJECTION);
+    }
+
+    @Override
+    public RequiredFieldResponse pushProjection(
+            RequiredFieldList requiredFieldList) throws FrontendException {
+        // save the required field list to the UDFContext properties.
+
+        Properties properties = getUDFContext();
+
+        ByteArrayOutputStream byteArray = new ByteArrayOutputStream();
+        ObjectOutputStream objOut = null;
+        try {
+            objOut = new ObjectOutputStream(byteArray);
+            objOut.writeObject(requiredFieldList);
+        } catch (IOException e) {
+            throw new FrontendException(e.toString(), e);
+        } finally {
+            IOUtils.closeStream(objOut);
+        }
+
+        // write out the whole required fields list as a base64 string
+        try {
+            properties.setProperty(PROJECTION_ID,
+                    new String(Base64.encodeBase64(byteArray.toByteArray()),
+                            "UTF-8"));
+        } catch (UnsupportedEncodingException e) {
+            throw new FrontendException(e.toString(), e);
+        }
+
+        return new RequiredFieldResponse(true);
+    }
+
+    /**
+     * Tries to determine the LoadFunc by using the LoadFuncHelper to identify a
+     * loader for the first file in the location directory.<br/>
+     * If no LoadFunc can be determine ad FrontendException is thrown.<br/>
+     * If the LoadFunc implements the LoadMetadata interface and returns a non
+     * null schema this schema is returned.
+     * 
+     * @param location
+     * @param job
+     * @return
+     * @throws IOException
+     */
+    private ResourceSchema getSchemaFromLoadFunc(String location, Job job)
+            throws IOException {
+
+        ResourceSchema schema = null;
+
+        if (loadFuncHelper == null) {
+            loadFuncHelper = new LoadFuncHelper(job.getConfiguration());
+        }
+
+        Path firstFile = loadFuncHelper.determineFirstFile(location);
+
+        if (childLoadFunc == null) {
+
+            // choose loader
+            FuncSpec funcSpec = loadFuncHelper.determineFunction(location,
+                    firstFile);
+
+            if (funcSpec == null) {
+                // throw front end exception, no loader could be determined.
+                throw new FrontendException(
+                        "No LoadFunction could be determined for " + location);
+            }
+
+            childLoadFunc = (LoadFunc) PigContext
+                    .instantiateFuncFromSpec(funcSpec);
+        }
+
+        LOG.debug("Found LoadFunc:  " + childLoadFunc.getClass().getName());
+
+        if (childLoadFunc instanceof LoadMetadata) {
+            schema = ((LoadMetadata) childLoadFunc).getSchema(firstFile.toUri()
+                    .toString(), job);
+            LOG.debug("Found schema " + schema + " from loadFunc:  "
+                    + childLoadFunc.getClass().getName());
+        }
+
+        return schema;
+    }
+
+    @Override
+    public ResourceSchema getSchema(String location, Job job)
+            throws IOException {
+
+        if (schema == null) {
+            ResourceSchema foundSchema = jsonMetadata.getSchema(location, job);
+
+            // determine schema from files in location
+            if (foundSchema == null) {
+                foundSchema = getSchemaFromLoadFunc(location, job);
+
+            }
+
+            // only add the partition keys if the schema is not null
+            // we use the partitionKeySet to only set partition keys once.
+            if (!(partitionKeysSet || foundSchema == null)) {
+                String[] keys = getPartitionColumns(location, job);
+
+                if (!(keys == null || keys.length == 0)) {
+
+                    // re-edit the pigSchema to contain the new partition keys.
+                    ResourceFieldSchema[] fields = foundSchema.getFields();
+
+                    LOG.debug("Schema: " + Arrays.toString(fields));
+
+                    ResourceFieldSchema[] newFields = Arrays.copyOf(fields,
+                            fields.length + keys.length);
+
+                    int index = fields.length;
+
+                    for (String key : keys) {
+                        newFields[index++] = new ResourceFieldSchema(
+                                new FieldSchema(key, DataType.CHARARRAY));
+                    }
+
+                    foundSchema.setFields(newFields);
+
+                    LOG.debug("Added partition fields: " + keys
+                            + " to loader schema");
+                    LOG.debug("Schema is: " + Arrays.toString(newFields));
+                }
+
+                partitionKeysSet = true;
+
+            }
+
+            schema = foundSchema;
+        }
+
+        return schema;
+    }
+
+    @Override
+    public ResourceStatistics getStatistics(String location, Job job)
+            throws IOException {
+        return null;
+    }
+
+    @Override
+    public void storeStatistics(ResourceStatistics stats, String location,
+            Job job) throws IOException {
+
+    }
+
+    @Override
+    public void storeSchema(ResourceSchema schema, String location, Job job)
+            throws IOException {
+        jsonMetadata.storeSchema(schema, location, job);
+    }
+
+    /**
+     * Reads the partition columns
+     * 
+     * @param location
+     * @param job
+     * @return
+     */
+    private String[] getPartitionColumns(String location, Job job) {
+
+        if (partitionColumns == null) {
+            // read the partition columns from the UDF Context first.
+            // if not in the UDF context then read it using the PathPartitioner.
+
+            Properties properties = getUDFContext();
+
+            if (properties == null) {
+                properties = new Properties();
+            }
+
+            String partitionColumnStr = properties
+                    .getProperty(PathPartitionHelper.PARTITION_COLUMNS);
+
+            if (partitionColumnStr == null
+                    && !(location == null || job == null)) {
+                // if it hasn't been written yet.
+                Set<String> partitionColumnSet;
+
+                try {
+                    partitionColumnSet = pathPartitionerHelper
+                            .getPartitionKeys(location, job.getConfiguration());
+                } catch (IOException e) {
+
+                    RuntimeException rte = new RuntimeException(e);
+                    rte.setStackTrace(e.getStackTrace());
+                    throw rte;
+
+                }
+
+                if (partitionColumnSet != null) {
+
+                    StringBuilder buff = new StringBuilder();
+
+                    int i = 0;
+                    for (String column : partitionColumnSet) {
+                        if (i++ != 0) {
+                            buff.append(',');
+                        }
+
+                        buff.append(column);
+                    }
+
+                    String buffStr = buff.toString().trim();
+
+                    if (buffStr.length() > 0) {
+
+                        properties.setProperty(
+                                PathPartitionHelper.PARTITION_COLUMNS,
+                                buff.toString());
+                    }
+
+                    partitionColumns = partitionColumnSet
+                            .toArray(new String[] {});
+
+                }
+
+            } else {
+                // the partition columns has been set already in the UDF Context
+                if (partitionColumnStr != null) {
+                    String split[] = partitionColumnStr.split(",");
+                    Set<String> partitionColumnSet = new LinkedHashSet<String>();
+                    if (split.length > 0) {
+                        for (String splitItem : split) {
+                            partitionColumnSet.add(splitItem);
+                        }
+                    }
+
+                    partitionColumns = partitionColumnSet
+                            .toArray(new String[] {});
+                }
+
+            }
+
+        }
+
+        return partitionColumns;
+
+    }
+
+    @Override
+    public String[] getPartitionKeys(String location, Job job)
+            throws IOException {
+
+        String[] partitionKeys = getPartitionColumns(location, job);
+
+        if (partitionKeys == null) {
+            throw new NullPointerException("INDUCED");
+        }
+        LOG.info("Get Parition Keys for: " + location + " keys: "
+                + Arrays.toString(partitionKeys));
+
+        return partitionKeys;
+    }
+
+    // --------------- Save Signature and PartitionFilter Expression
+    // ----------------- //
+    @Override
+    public void setUDFContextSignature(String signature) {
+        this.signature = signature;
+        super.setUDFContextSignature(signature);
+    }
+
+    private Properties getUDFContext() {
+        return UDFContext.getUDFContext().getUDFProperties(this.getClass(),
+                new String[] { signature });
+    }
+
+    @Override
+    public void setPartitionFilter(Expression partitionFilter)
+            throws IOException {
+        LOG.debug("PartitionFilter: " + partitionFilter.toString());
+
+        pathPartitionerHelper.setPartitionFilterExpression(
+                partitionFilter.toString(), AllLoader.class, signature);
+
+    }
+
+    /**
+     * InputFormat that encapsulates the correct input format based on the file
+     * type.
+     * 
+     */
+    public static class AllLoaderInputFormat extends
+            FileInputFormat<Writable, Writable> {
+
+        transient PathPartitionHelper partitionHelper = new PathPartitionHelper();
+        String udfSignature;
+
+        public AllLoaderInputFormat(String udfSignature) {
+            super();
+            this.udfSignature = udfSignature;
+        }
+
+        @Override
+        protected List<FileStatus> listStatus(JobContext jobContext)
+                throws IOException {
+
+            List<FileStatus> files = partitionHelper.listStatus(jobContext,
+                    AllLoader.class, udfSignature);
+
+            if (files == null)
+                files = super.listStatus(jobContext);
+
+            return files;
+
+        }
+
+        @Override
+        public RecordReader<Writable, Writable> createRecordReader(
+                InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
+                throws IOException, InterruptedException {
+
+            // this method plugs the AllReader into the system, and the
+            // AllReader will when called select the correct LoadFunc
+            // return new AllReader(udfSignature);
+            return new AllReader(udfSignature);
+        }
+
+    }
+
+    /**
+     * This is where the logic is for selecting the correct Loader.
+     * 
+     */
+    public static class AllReader extends RecordReader<Writable, Writable> {
+
+        LoadFunc selectedLoadFunc;
+        RecordReader<Writable, Writable> selectedReader;
+        LoadFuncHelper loadFuncHelper = null;
+        String udfSignature;
+        Path path;
+
+        public AllReader(String udfSignature) {
+            this.udfSignature = udfSignature;
+        }
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void initialize(InputSplit inputSplit,
+                TaskAttemptContext taskAttemptContext) throws IOException,
+                InterruptedException {
+
+            FileSplit fileSplit = (FileSplit) inputSplit;
+
+            path = fileSplit.getPath();
+            String fileName = path.toUri().toString();
+
+            // select the correct load function and initialise
+            loadFuncHelper = new LoadFuncHelper(
+                    taskAttemptContext.getConfiguration());
+
+            FuncSpec funcSpec = loadFuncHelper.determineFunction(fileName);
+
+            if (funcSpec == null) {
+                throw new IOException("Cannot determine LoadFunc for "
+                        + fileName);
+            }
+
+            selectedLoadFunc = (LoadFunc) PigContext
+                    .instantiateFuncFromSpec(funcSpec);
+
+            selectedLoadFunc.setUDFContextSignature(udfSignature);
+            selectedLoadFunc.setLocation(fileName,
+                    new Job(taskAttemptContext.getConfiguration(),
+                            taskAttemptContext.getJobName()));
+
+            selectedReader = selectedLoadFunc.getInputFormat()
+                    .createRecordReader(fileSplit, taskAttemptContext);
+
+            selectedReader.initialize(fileSplit, taskAttemptContext);
+
+            LOG.info("Using LoadFunc " + selectedLoadFunc.getClass().getName()
+                    + " on " + fileName);
+
+        }
+
+        // ---------------------- all functions below this line delegate work to
+        // the selectedReader ------------//
+
+        public LoadFunc prepareLoadFuncForReading(PigSplit split)
+                throws IOException {
+
+            selectedLoadFunc.prepareToRead(selectedReader, split);
+            return selectedLoadFunc;
+
+        }
+
+        @Override
+        public boolean nextKeyValue() throws IOException, InterruptedException {
+            return selectedReader.nextKeyValue();
+        }
+
+        @Override
+        public Writable getCurrentKey() throws IOException,
+                InterruptedException {
+            return selectedReader.getCurrentKey();
+        }
+
+        @Override
+        public Writable getCurrentValue() throws IOException,
+                InterruptedException {
+            return selectedReader.getCurrentValue();
+        }
+
+        @Override
+        public float getProgress() throws IOException, InterruptedException {
+            return selectedReader.getProgress();
+        }
+
+        @Override
+        public void close() throws IOException {
+            selectedReader.close();
+        }
+
+    }
+
+}

Added: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/allloader/LoadFuncHelper.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/allloader/LoadFuncHelper.java?rev=1035726&view=auto
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/allloader/LoadFuncHelper.java (added)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/allloader/LoadFuncHelper.java Tue Nov 16 18:23:23 2010
@@ -0,0 +1,474 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.pig.piggybank.storage.allloader;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+
+/**
+ * 
+ * Contains the logic for finding a LoadFunc based on the definition of:
+ * <ul>
+ * <li>file.extension.loaders</li>
+ * <li>file.format.loaders</li>
+ * </ul>
+ * 
+ */
+public class LoadFuncHelper {
+
+    public static final String FILE_EXTENSION_LOADERS = "file.extension.loaders";
+
+    /**
+     * The most common type of file formats are supported i.e. SEQ, GZ, BZ2, LZO
+     * This is used when a file does not have an extension. If this is true the
+     * first 3 bytes can be read from a file to determine its content type. The
+     * 3 bytes are then mapped to an extension for which an entry must exist in
+     * file.extension.loaders. If the content does not match the any entry in
+     * magicNumberExtensionMap plain text is assumed.
+     */
+    private static Map<MagicNumber, String> magicNumberExtensionMap = buildMagicNumberExtensionMap();
+
+    Configuration conf;
+
+    FileSystem fileSystem;
+
+    /**
+     * Stores the extension:tag = load function pairs
+     */
+    Map<String, String> loadFunctionExtensionTagMap;
+
+    /**
+     * Stores the extension = tags pairs
+     */
+    Map<String, Set<String>> extensionTagsMap;
+
+    public LoadFuncHelper(Configuration conf) throws IOException {
+        this.conf = conf;
+        fileSystem = FileSystem.get(conf);
+
+        loadFunctionExtensionTagMap = new HashMap<String, String>();
+        extensionTagsMap = new HashMap<String, Set<String>>();
+
+        String fileExtensionLoaders = conf.get(FILE_EXTENSION_LOADERS);
+
+        if (fileExtensionLoaders != null) {
+            String[] loaderExtensionPairs = fileExtensionLoaders.split("\\),");
+            for (String loaderExtensionPairStr : loaderExtensionPairs) {
+
+                String[] loaderExtensionPair = loaderExtensionPairStr
+                        .split(":");
+                if (loaderExtensionPair.length == 2) {
+                    // we have extension:loader assign EMPTY TAG
+                    loadFunctionExtensionTagMap.put(
+                            loaderExtensionPair[0].trim() + ":",
+                            loaderExtensionPair[1].trim());
+                } else if (loaderExtensionPair.length == 3
+                        || loaderExtensionPair.length == 4) {
+                    // we have extension:pathTag:loader assign TAG
+                    String ext = loaderExtensionPair[0].trim();
+                    String tag = loaderExtensionPair[1].trim();
+
+                    String key = ext + ":" + tag;
+
+                    String loadFunc = loaderExtensionPair[2].trim();
+                    // support key class names for sequence files
+                    if (loaderExtensionPair.length == 4) {
+                        // loadFunc here is not loadFunc but the sequence file
+                        // key class
+                        key += ":" + loadFunc;
+                        loadFunc = loaderExtensionPair[3].trim();
+                    }
+
+                    loadFunctionExtensionTagMap.put(key, loadFunc);
+
+                    Set<String> tags = extensionTagsMap.get(ext);
+                    if (tags == null) {
+                        tags = new TreeSet<String>();
+                        extensionTagsMap.put(ext, tags);
+                    }
+
+                    tags.add(tag);
+
+                } else {
+                    throw new FrontendException(
+                            "Bad formatted file.extension.loaders string, format is <extension>:<loader>,<extenion><loader>");
+                }
+            }
+        }
+
+    }
+
+    /**
+     * 
+     * @return
+     */
+    private static Map<MagicNumber, String> buildMagicNumberExtensionMap() {
+        Map<MagicNumber, String> magicNumberExtensionMap = new HashMap<MagicNumber, String>();
+        magicNumberExtensionMap.put(new MagicNumber(new byte[] { 83, 69, 81 }),
+                "seq");
+        magicNumberExtensionMap.put(
+                new MagicNumber(new byte[] { -119, 76, 90 }), "lzo");
+        magicNumberExtensionMap.put(
+                new MagicNumber(new byte[] { 31, -117, 8 }), "gz");
+        magicNumberExtensionMap.put(
+                new MagicNumber(new byte[] { 66, 90, 104 }), "bz2");
+
+        return magicNumberExtensionMap;
+    }
+
+    /**
+     * If location is a directory the first file found is returned
+     * 
+     * @param location
+     * @return
+     * @throws IOException
+     *             if no file is found a FrontendException is thrown
+     */
+    public Path determineFirstFile(String location) throws IOException {
+        Path path = new Path(location);
+        FileStatus status = fileSystem.getFileStatus(path);
+
+        if (status.isDir()) {
+            // get the first file.
+            path = getFirstFile(fileSystem, path);
+
+            if (path == null) {
+                throw new FrontendException(path + " has no files");
+            }
+        }
+
+        return path;
+    }
+
+    /**
+     * 
+     * If location is a directory the first file found in the directory is used.<br/>
+     * The file extension of the file will be searched against the
+     * file.extension.loaders mappings. If none found null is returned.
+     * 
+     * @param location
+     * @return
+     * @throws IOException
+     */
+    public FuncSpec determineFunction(String location) throws IOException {
+        return determineFunction(location, determineFirstFile(location));
+    }
+
+    /**
+     * 
+     * The file extension of the file will be searched against the
+     * file.extension.loaders mappings. If none found null is returned.
+     * 
+     * @param path
+     * @param location
+     * @return
+     * @throws IOException
+     */
+    public FuncSpec determineFunction(String location, Path path)
+            throws IOException {
+
+        String fileName = path.getName();
+
+        FuncSpec funcSpec = getLoadPerExtension(fileName, path);
+
+        if (funcSpec == null) {
+            // look for loaders by the content definition
+
+            funcSpec = getFuncSpecFromContent(path);
+
+        }
+
+        return funcSpec;
+    }
+
+    /**
+     * Tries to identify the extension and there by the loader from the content
+     * type.
+     * 
+     * @param path
+     * @return
+     * @throws IOException
+     */
+    private FuncSpec getFuncSpecFromContent(Path path) throws IOException {
+        // get the first three bytes from the file.
+        FSDataInputStream dataIn = null;
+        byte[] magic = new byte[3];
+        int read = -1;
+
+        try {
+            dataIn = fileSystem.open(path, 3);
+            read = dataIn.read(magic);
+        } finally {
+            dataIn.close();
+        }
+
+        FuncSpec funcSpec = null;
+        String extensionMapping = magicNumberExtensionMap.get(new MagicNumber(
+                magic));
+
+        if (read < magic.length || extensionMapping == null) {
+            // assume plain text
+            funcSpec = new FuncSpec("PigStorage()");
+        } else {
+            // an extension mapping was found. i.e. this is a GZ, BZ2, LZO or
+            // SEQ file
+
+            String applicableTag = getApplicableTag(extensionMapping, path);
+            String loadFuncDefinition = null;
+
+            if (extensionMapping.equals("seq")) {
+                // if this is a sequence file we load the key class also
+                loadFuncDefinition = loadFunctionExtensionTagMap
+                        .get(extensionMapping + ":" + applicableTag + ":"
+                                + getSequenceFileKeyClass(path));
+
+            }
+
+            // we do this also for sequence file because a sequence file might
+            // have a sequeyceFileKey associated or not in the extension mapping
+            // given both cases if the key class is not found above in the
+            // mapping, the default sequence file loader needs to be used as per
+            // the extension mapping.
+            if (loadFuncDefinition == null) {
+                // use only extension and tag filtering
+                loadFuncDefinition = loadFunctionExtensionTagMap
+                        .get(extensionMapping + ":" + applicableTag);
+
+            }
+
+            if (loadFuncDefinition == null) {
+                // if still null thrown an error
+                throw new RuntimeException("Cannot find loader for " + path
+                        + " extension mapping " + extensionMapping);
+            }
+
+            funcSpec = new FuncSpec(loadFuncDefinition);
+        }
+
+        return funcSpec;
+    }
+
+    /**
+     * Open a SequenceFile.Reader instance and return the keyClassName
+     * 
+     * @param path
+     * @return
+     * @throws IOException
+     */
+    private String getSequenceFileKeyClass(Path path) throws IOException {
+
+        String keyClassName = null;
+        SequenceFile.Reader reader = new SequenceFile.Reader(fileSystem, path,
+                conf);
+
+        try {
+
+            keyClassName = reader.getKeyClassName();
+            int index = keyClassName.indexOf("$");
+            if (index > 0) {
+                keyClassName = keyClassName.substring(0, index);
+            }
+
+        } finally {
+            reader.close();
+        }
+
+        return keyClassName;
+    }
+
+    /**
+     * Search for the correct loader based on the extension and tags mappings.
+     * 
+     * @param fileName
+     * @param path
+     * @return
+     */
+    private FuncSpec getLoadPerExtension(String fileName, Path path) {
+
+        String extension = null;
+        String applicableTag = null;
+        String loadFuncDefinition = null;
+        FuncSpec funcSpec = null;
+
+        // NOTE: the inverse logic !( a == null && b == null) is not used
+        // because we want all statements to be cheked as long as they are not
+        // null.
+        while (fileName != null && (extension = getExtension(fileName)) != null
+                && (applicableTag = getApplicableTag(extension, path)) != null) {
+
+            if ((loadFuncDefinition = loadFunctionExtensionTagMap.get(extension
+                    + ":" + applicableTag)) != null) {
+                // create the LoadFunc
+                funcSpec = new FuncSpec(loadFuncDefinition);
+                break;
+            }
+
+            fileName = cutExtension(fileName);
+        }
+
+        return funcSpec;
+    }
+
+    /**
+     * Searches in the path for the first occurrence of the tags associated with
+     * the extension.<br/>
+     * If this extension has no tags an empty string is returned.<br/>
+     * If it has tags and no tag is found in the path null is returned.<br/>
+     * 
+     * @param extension
+     * @param path
+     * @return
+     */
+    private String getApplicableTag(String extension, Path path) {
+
+        Set<String> tags = extensionTagsMap.get(extension);
+        String applicableTag = null;
+
+        if (tags != null) {
+
+            String fullPathName = path.toUri().toString();
+
+            for (String tag : tags) {
+                if (fullPathName.contains(tag)) {
+                    applicableTag = tag;
+                    break;
+                }
+            }
+
+        } else {
+            applicableTag = "";
+        }
+
+        return applicableTag;
+    }
+
+    /**
+     * @param fileName
+     * @return String return the file name without the last extension e.g.
+     *         file.rc.gz will return file.rc
+     */
+    private static String cutExtension(String fileName) {
+        String name = null;
+
+        int index = fileName.lastIndexOf('.');
+        if (index > 0 && index < fileName.length()) {
+            name = fileName.substring(0, index);
+        }
+
+        return name;
+    }
+
+    /**
+     * 
+     * @param fileName
+     * @return String return the last file name extension e.g. file.rc.gz will
+     *         return gz
+     */
+    private static String getExtension(String fileName) {
+
+        String extension = null;
+
+        int index = fileName.lastIndexOf('.');
+        int pos = index + 1;
+        if (index > 0 && pos < fileName.length()) {
+            extension = fileName.substring(pos, fileName.length());
+        }
+
+        return extension;
+    }
+
+    /**
+     * Looks for and returns the first file it can find.
+     * 
+     * @return Path null is no file was found
+     * @throws IOException
+     */
+    private static Path getFirstFile(FileSystem fileSystem, Path path)
+            throws IOException {
+        Path currentPath = path;
+        Path file = null;
+
+        FileStatus[] paths = fileSystem.listStatus(currentPath);
+
+        for (FileStatus subPathStatus : paths) {
+
+            currentPath = subPathStatus.getPath();
+
+            // if hidden file skip.
+            if (currentPath.getName().startsWith(".")
+                    || currentPath.getName().startsWith("_")) {
+                continue;
+            }
+
+            if (subPathStatus.isDir()) {
+                file = getFirstFile(fileSystem, currentPath);
+            } else {
+                // first file found return.
+                file = currentPath;
+                break;
+            }
+        }
+
+        return file;
+    }
+
+    static class MagicNumber {
+
+        byte[] magic;
+
+        public MagicNumber(byte[] magic) {
+            super();
+            this.magic = magic;
+        }
+
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = 1;
+            result = prime * result + Arrays.hashCode(magic);
+            return result;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj)
+                return true;
+            if (obj == null)
+                return false;
+            if (getClass() != obj.getClass())
+                return false;
+            MagicNumber other = (MagicNumber) obj;
+            if (!Arrays.equals(magic, other.magic))
+                return false;
+            return true;
+        }
+
+    }
+}

Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAllLoader.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAllLoader.java?rev=1035726&view=auto
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAllLoader.java (added)
+++ pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAllLoader.java Tue Nov 16 18:23:23 2010
@@ -0,0 +1,694 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.pig.piggybank.test.storage;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.RCFile;
+import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.pig.ExecType;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.PigServer;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.piggybank.storage.AllLoader;
+import org.apache.pig.piggybank.storage.HiveColumnarLoader;
+import org.apache.pig.piggybank.storage.allloader.LoadFuncHelper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mortbay.log.Log;
+
+public class TestAllLoader extends TestCase {
+
+    enum TYPE {
+        HIVERC(".rc", new HiveRCFileTestWriter()), GZIP_PLAIN(".gz",
+                new GzipFileTestWriter());
+
+        String extension;
+        FileTestWriter writer;
+
+        TYPE(String extension, FileTestWriter writer) {
+            this.extension = extension;
+            this.writer = writer;
+        }
+
+    }
+
+    String colSeparator = ",";
+
+    /**
+     * All test files will contain this amount of records
+     */
+    int fileRecords = 100;
+    /**
+     * All files will contain this amount of columns in each row
+     */
+    int fileColumns = 2;
+
+    final TYPE fileTypes[] = new TYPE[] { TYPE.GZIP_PLAIN, TYPE.HIVERC };
+
+    final String extensionLoaders = "gz:org.apache.pig.builtin.PigStorage('"
+            + colSeparator + "'), rc:"
+            + HiveColumnarLoader.class.getCanonicalName()
+            + "('v1 float,v2 float')";
+
+    File baseDir;
+
+    File simpleDir;
+
+    // --------------- For date partitioning based on daydate= (this will work
+    // for year=, day= etc
+    File datePartitionDir;
+    final String[] datePartitions = new String[] { "daydate=2010-11-01",
+            "daydate=2010-11-02", "daydate=2010-11-03", "daydate=2010-11-04",
+            "daydate=2010-11-05" };
+
+    // -------------- Logic partitioning that does not involve dates
+    File logicPartitionDir;
+    final String[] logicPartitions = new String[] { "block=1", "block=2",
+            "block=3" };
+
+    // -------------- Logic Tagged partitioning that is /type1/block=1
+    // /type2/block2
+    // This is needed because the Schema for each path might be different and we
+    // want to use a different loader based on if we are
+    // looking in type1 or in type2 see taggedExtensionLoaders
+    File taggedLogicPartitionDir;
+    final String taggedExtensionLoaders = "gz:org.apache.pig.builtin.PigStorage('"
+            + colSeparator
+            + "'), rc:type1:"
+            + HiveColumnarLoader.class.getCanonicalName()
+            + "('v1 float,v2 float'), rc:type2:"
+            + HiveColumnarLoader.class.getCanonicalName() + "('v1 float')";
+
+    final String[] tags = new String[] { "type1", "type2" };
+    final String[] taggedSchemas = new String[] { "(p: float, q:float)",
+            "(p:float)" };
+
+    // -------------- Test Load Files by Content
+    File filesByContentDir;
+    final String contentLoaders = "gz:org.apache.pig.builtin.PigStorage('"
+            + colSeparator
+            + "'), rc:"
+            + HiveColumnarLoader.class.getCanonicalName()
+            + "('v1 float,v2 float'), seq::org.apache.hadoop.hive.ql.io.RCFile:"
+            + HiveColumnarLoader.class.getCanonicalName()
+            + "('v1 float,v2 float')";
+
+    Properties configuration;
+
+    final String allLoaderName = AllLoader.class.getCanonicalName();
+    final String allLoaderFuncSpec = allLoaderName + "()";
+
+    PigServer server;
+
+    /**
+     * Test that we can load files with the correct loaders as per file content.
+     * 
+     * @throws IOException
+     */
+    @Test
+    public void testFilesByContentDir() throws IOException {
+
+        server.shutdown();
+
+        configuration.setProperty(LoadFuncHelper.FILE_EXTENSION_LOADERS,
+                contentLoaders);
+
+        server = new PigServer(ExecType.LOCAL, configuration);
+
+        server.setBatchOn();
+        server.registerFunction(allLoaderName, new FuncSpec(allLoaderFuncSpec));
+
+        server.registerQuery("a = LOAD '" + filesByContentDir.getAbsolutePath()
+                + "' using " + allLoaderFuncSpec + " as (p:float, q:float);");
+
+        readRecordsFromLoader(server, "a", fileRecords);
+
+    }
+
+    /**
+     * Test that we can read a tagged logic partitioned directory
+     * 
+     * @throws IOException
+     */
+    @Test
+    public void testTaggedLogicPartitionDir() throws IOException {
+
+        server.shutdown();
+
+        configuration.setProperty(LoadFuncHelper.FILE_EXTENSION_LOADERS,
+                taggedExtensionLoaders);
+
+        server = new PigServer(ExecType.LOCAL, configuration);
+
+        server.setBatchOn();
+        server.registerFunction(allLoaderName, new FuncSpec(allLoaderFuncSpec));
+
+        int i = 0;
+        for (String tag : tags) {
+
+            String schema = taggedSchemas[i++];
+
+            server.registerQuery(tag + " = LOAD '"
+                    + taggedLogicPartitionDir.getAbsolutePath() + "/" + tag
+                    + "' using " + allLoaderFuncSpec + " as " + schema + ";");
+
+            readRecordsFromLoader(server, tag, fileRecords
+                    * logicPartitions.length * fileTypes.length);
+        }
+
+    }
+
+    /**
+     * Test that we can filter by a logic partition based on a range, in this
+     * case block<=2
+     * 
+     * @throws IOException
+     */
+    @Test
+    public void testLogicPartitionFilter() throws IOException {
+
+        server.registerQuery("a = LOAD '" + logicPartitionDir.getAbsolutePath()
+                + "' using " + allLoaderName + "('block<=2')"
+                + " as (q:float, p:float);");
+
+        server.registerQuery("r = FOREACH a GENERATE q, p;");
+
+        Iterator<Tuple> it = server.openIterator("r");
+
+        int count = 0;
+
+        while (it.hasNext()) {
+            count++;
+            Tuple t = it.next();
+            // System.out.println(count + " : " + t.toDelimitedString(","));
+            assertEquals(2, t.size());
+
+        }
+
+        // only 2 partitions are used in the query block=3 is filtered out.
+        assertEquals(fileRecords * 2 * fileTypes.length, count);
+
+    }
+
+    /**
+     * Test that we can extract only the partition column
+     * 
+     * @throws IOException
+     */
+    @Test
+    public void testLogicPartitionPartitionColumnExtract() throws IOException {
+
+        server.registerQuery("a = LOAD '" + logicPartitionDir.getAbsolutePath()
+                + "' using " + allLoaderFuncSpec
+                + " as (q:float, p:float, block:chararray);");
+
+        server.registerQuery("r = foreach a generate block;");
+
+        Iterator<Tuple> it = server.openIterator("r");
+
+        int count = 0;
+        Map<String, AtomicInteger> partitionCount = new HashMap<String, AtomicInteger>();
+
+        while (it.hasNext()) {
+            count++;
+            Tuple t = it.next();
+
+            assertEquals(1, t.size());
+            String key = t.get(0).toString();
+
+            AtomicInteger ati = partitionCount.get(key);
+            if (ati == null) {
+                ati = new AtomicInteger(1);
+                partitionCount.put(key, ati);
+            } else {
+                ati.incrementAndGet();
+            }
+
+        }
+
+        // test that all partitions where read
+        for (AtomicInteger ati : partitionCount.values()) {
+            assertEquals(fileRecords * fileTypes.length, ati.get());
+        }
+
+        assertEquals(fileRecords * logicPartitions.length * fileTypes.length,
+                count);
+
+    }
+
+    /**
+     * Test that we can read a logic partitioned directory
+     * 
+     * @throws IOException
+     */
+    @Test
+    public void testLogicPartitionDir() throws IOException {
+
+        server.registerQuery("a = LOAD '" + logicPartitionDir.getAbsolutePath()
+                + "' using " + allLoaderFuncSpec + " as (q:float, p:float);");
+
+        readRecordsFromLoader(server, "a", fileRecords * logicPartitions.length
+                * fileTypes.length);
+
+    }
+
+    /**
+     * Test that we can filter a date partitioned directory by a date range
+     * 
+     * @throws IOException
+     */
+    @Test
+    public void testDateParitionFilterWithAsSchema() throws IOException {
+
+        server.registerQuery("a = LOAD '"
+                + datePartitionDir.getAbsolutePath()
+                + "' using "
+                + allLoaderName
+                + "('daydate >= \"2010-11-02\" and daydate <= \"2010-11-04\"') AS (q:float, p:float, daydate:chararray); ");
+
+        server.registerQuery("r = FOREACH a GENERATE $0;");
+
+        Iterator<Tuple> it = server.openIterator("r");
+
+        int count = 0;
+
+        while (it.hasNext()) {
+            count++;
+            Tuple t = it.next();
+
+            assertEquals(1, t.size());
+
+        }
+        // we filter out 2 date partitions using only 3
+        readRecordsFromLoader(server, "a", fileRecords * 3 * fileTypes.length);
+    }
+
+    /**
+     * Test that we can filter a date partitioned directory by a date range
+     * 
+     * @throws IOException
+     */
+    @Test
+    public void testDateParitionFilterWithoutSchema() throws IOException {
+
+        server.registerQuery("a = LOAD '"
+                + datePartitionDir.getAbsolutePath()
+                + "' using "
+                + allLoaderName
+                + "('daydate >= \"2010-11-02\" and daydate <= \"2010-11-04\"'); ");
+
+        server.registerQuery("r = FOREACH a GENERATE $0;");
+
+        Iterator<Tuple> it = server.openIterator("r");
+
+        int count = 0;
+
+        while (it.hasNext()) {
+            count++;
+            Tuple t = it.next();
+
+            float f = Float.valueOf(t.get(0).toString());
+            assertTrue(f < 1L);
+            assertEquals(1, t.size());
+
+        }
+        // we filter out 2 date partitions using only 3
+        readRecordsFromLoader(server, "a", fileRecords * 3 * fileTypes.length);
+    }
+
+    /**
+     * Test that we can read a date partitioned directory
+     * 
+     * @throws IOException
+     * @throws ParseException
+     */
+    @Test
+    public void testDateParitionDir() throws IOException, ParseException {
+
+        server.registerQuery("a = LOAD '" + datePartitionDir.getAbsolutePath()
+                + "' using " + allLoaderFuncSpec
+                + " as (q:float, p:float, daydate:chararray);");
+
+        server.registerQuery("r = FOREACH a GENERATE daydate;");
+
+        Iterator<Tuple> it = server.openIterator("r");
+
+        DateFormat dateformat = new SimpleDateFormat("yyyy-MM-dd");
+
+        Calendar cal = Calendar.getInstance();
+
+        while (it.hasNext()) {
+            Date date = dateformat.parse(it.next().get(0).toString());
+            cal.setTime(date);
+            assertEquals(2010, cal.get(Calendar.YEAR));
+            assertEquals(10, cal.get(Calendar.MONTH)); // month starts at 0 so
+                                                        // November 11 is 10
+
+        }
+
+        readRecordsFromLoader(server, "a", fileRecords * datePartitions.length
+                * fileTypes.length);
+    }
+
+    /**
+     * Test that we can read from a simple directory
+     * 
+     * @throws IOException
+     */
+    @Test
+    public void testSimpleDir() throws IOException {
+
+        server.registerQuery("a = LOAD '" + simpleDir.getAbsolutePath()
+                + "' using " + allLoaderFuncSpec + " as (q:float, p:float);");
+
+        readRecordsFromLoader(server, "a", fileRecords * fileTypes.length);
+
+    }
+
+    /**
+     * Validates that the loadAlias can read the correct amount of records
+     * 
+     * @param server
+     * @param loadAlias
+     * @throws IOException
+     */
+    private void readRecordsFromLoader(PigServer server, String loadAlias,
+            int totalRowCount) throws IOException {
+
+        Iterator<Tuple> result = server.openIterator(loadAlias);
+        int count = 0;
+
+        while ((result.next()) != null) {
+            count++;
+        }
+
+        Log.info("Validating expected: " + totalRowCount + " against " + count);
+        assertEquals(totalRowCount, count);
+    }
+
+    @Before
+    public void setUp() throws Exception {
+
+        Configuration hadoopConf = new Configuration();
+        FileSystem.setDefaultUri(hadoopConf,
+                LocalFileSystem.getDefaultUri(hadoopConf));
+
+        if (baseDir == null) {
+            configuration = new Properties();
+            configuration.setProperty(LoadFuncHelper.FILE_EXTENSION_LOADERS,
+                    extensionLoaders);
+
+            baseDir = new File("build/test/testAllLoader");
+            if (baseDir.exists()) {
+                FileUtil.fullyDelete(baseDir);
+            }
+
+            assertTrue(baseDir.mkdirs());
+
+            createSimpleDir();
+            createDatePartitionDir();
+            createLogicPartitionDir();
+            createTaggedLogicPartitionDir();
+            createFileByContentDir();
+
+            server = new PigServer(ExecType.LOCAL, configuration);
+
+            server.setBatchOn();
+            server.registerFunction(allLoaderName, new FuncSpec(
+                    allLoaderFuncSpec));
+        }
+    }
+
+    /**
+     * Write out all files without there extensions
+     * 
+     * @throws IOException
+     */
+    private void createFileByContentDir() throws IOException {
+
+        filesByContentDir = new File(baseDir, "filesByContentDir");
+        assertTrue(filesByContentDir.mkdirs());
+
+        // for each type create the files without its extension
+
+        File uniqueFile = new File(filesByContentDir, ""
+                + System.currentTimeMillis());
+        TYPE.HIVERC.writer.writeTestData(uniqueFile, fileRecords, fileColumns,
+                colSeparator);
+
+    }
+
+    /**
+     * Write out the tagged logical partitioning directories e.g. type=1/block=1
+     * 
+     * @throws IOException
+     */
+    private void createTaggedLogicPartitionDir() throws IOException {
+        taggedLogicPartitionDir = new File(baseDir, "taggedLogicPartitionDir");
+        assertTrue(taggedLogicPartitionDir.mkdirs());
+
+        // for each tag create a directory
+        for (String tag : tags) {
+            // for each logic partition create a directory
+            File tagDir = new File(taggedLogicPartitionDir, tag);
+            for (String partition : logicPartitions) {
+                File logicPartition = new File(tagDir, partition);
+
+                assertTrue(logicPartition.mkdirs());
+
+                for (TYPE fileType : fileTypes) {
+                    writeFile(logicPartition, fileType);
+                }
+            }
+
+        }
+    }
+
+    /**
+     * Write out logical partitioned directories with one file per fileType:TYPE
+     * 
+     * @throws IOException
+     */
+    private void createLogicPartitionDir() throws IOException {
+
+        logicPartitionDir = new File(baseDir, "logicPartitionDir");
+        assertTrue(logicPartitionDir.mkdirs());
+
+        // for each logic partition create a directory
+        for (String partition : logicPartitions) {
+            File logicPartition = new File(logicPartitionDir, partition);
+
+            assertTrue(logicPartition.mkdirs());
+
+            for (TYPE fileType : fileTypes) {
+                writeFile(logicPartition, fileType);
+            }
+        }
+
+    }
+
+    /**
+     * Write out date partitioned directories with one file per fileType:TYPE
+     * 
+     * @throws IOException
+     */
+    private void createDatePartitionDir() throws IOException {
+
+        datePartitionDir = new File(baseDir, "dateParitionDir");
+        assertTrue(datePartitionDir.mkdirs());
+
+        // for each date partition create a directory
+        for (String partition : datePartitions) {
+            File datePartition = new File(datePartitionDir, partition);
+
+            assertTrue(datePartition.mkdirs());
+
+            for (TYPE fileType : fileTypes) {
+                writeFile(datePartition, fileType);
+            }
+        }
+
+    }
+
+    /**
+     * Write out a simple directory with one file per fileType:TYPE
+     * 
+     * @throws IOException
+     */
+    private void createSimpleDir() throws IOException {
+
+        simpleDir = new File(baseDir, "simpleDir");
+        assertTrue(simpleDir.mkdirs());
+
+        for (TYPE fileType : fileTypes) {
+            writeFile(simpleDir, fileType);
+        }
+
+    }
+
+    /**
+     * Create a unique file name with format
+     * [currentTimeInMillis].[type.extension]
+     * 
+     * @param dir
+     * @param type
+     * @throws IOException
+     */
+    private void writeFile(File dir, TYPE type) throws IOException {
+        File uniqueFile = new File(dir, System.currentTimeMillis()
+                + type.extension);
+        type.writer.writeTestData(uniqueFile, fileRecords, fileColumns,
+                colSeparator);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+
+        server.shutdown();
+
+        FileUtil.fullyDelete(baseDir);
+        baseDir = null;
+
+    }
+
+    /**
+     * Simple interface to help with writting test data.
+     * 
+     */
+    private static interface FileTestWriter {
+        void writeTestData(File file, int recordCounts, int columnCount,
+                String colSeparator) throws IOException;
+    }
+
+    /**
+     * Write Gzip Content
+     * 
+     */
+    private static class GzipFileTestWriter implements FileTestWriter {
+
+        @Override
+        public void writeTestData(File file, int recordCounts, int columnCount,
+                String colSeparator) throws IOException {
+
+            // write random test data
+            GzipCodec gzipCodec = new GzipCodec();
+            CompressionOutputStream out = gzipCodec
+                    .createOutputStream(new FileOutputStream(file));
+            BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(
+                    out));
+
+            try {
+
+                for (int r = 0; r < recordCounts; r++) {
+                    // foreach row write n columns
+
+                    for (int c = 0; c < columnCount; c++) {
+
+                        if (c != 0) {
+                            writer.append(colSeparator);
+                        }
+
+                        writer.append(String.valueOf(Math.random()));
+
+                    }
+                    writer.append("\n");
+
+                }
+
+            } finally {
+                writer.close();
+                out.close();
+            }
+
+        }
+
+    }
+
+    private static class HiveRCFileTestWriter implements FileTestWriter {
+
+        @Override
+        public void writeTestData(File file, int recordCounts, int columnCount,
+                String colSeparator) throws IOException {
+
+            // write random test data
+
+            Configuration conf = new Configuration();
+            FileSystem fs = FileSystem.getLocal(conf);
+
+            RCFileOutputFormat.setColumnNumber(conf, columnCount);
+            RCFile.Writer writer = new RCFile.Writer(fs, conf, new Path(
+                    file.getAbsolutePath()));
+
+            BytesRefArrayWritable bytes = new BytesRefArrayWritable(columnCount);
+
+            for (int c = 0; c < columnCount; c++) {
+                bytes.set(c, new BytesRefWritable());
+            }
+
+            try {
+
+                for (int r = 0; r < recordCounts; r++) {
+                    // foreach row write n columns
+
+                    for (int c = 0; c < columnCount; c++) {
+
+                        byte[] stringbytes = String.valueOf(Math.random())
+                                .getBytes();
+                        bytes.get(c).set(stringbytes, 0, stringbytes.length);
+
+                    }
+
+                    writer.append(bytes);
+
+                }
+
+            } finally {
+                writer.close();
+            }
+
+        }
+
+    }
+
+}

Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestLoadFuncHelper.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestLoadFuncHelper.java?rev=1035726&view=auto
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestLoadFuncHelper.java (added)
+++ pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestLoadFuncHelper.java Tue Nov 16 18:23:23 2010
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.pig.piggybank.test.storage;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.pig.ExecType;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.PigServer;
+import org.apache.pig.piggybank.storage.allloader.LoadFuncHelper;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * 
+ * Tests that the LoadFunchelper's public method returns the values required.
+ */
+public class TestLoadFuncHelper extends TestCase {
+
+    private static final String LOADER = "org.apache.pig.builtin.PigStorage()";
+    private static final String extensionLoaders = "txt:" + LOADER;
+    Configuration configuration;
+    LoadFuncHelper helper = null;
+
+    File baseDir;
+
+    File testFile;
+
+    @Test
+    public void testDetermineFunctionSingleArg() throws IOException {
+
+        FuncSpec funcSpec = helper.determineFunction(baseDir.getAbsolutePath());
+
+        assertNotNull(funcSpec);
+        assertEquals(LOADER, funcSpec.toString() + "()");
+
+    }
+
+    @Test
+    public void testDetermineFunction() throws IOException {
+
+        Path firstFile = helper.determineFirstFile(baseDir.getAbsolutePath());
+        FuncSpec funcSpec = helper.determineFunction(baseDir.getAbsolutePath(),
+                firstFile);
+
+        assertNotNull(funcSpec);
+        assertEquals(LOADER, funcSpec.toString() + "()");
+
+    }
+
+    @Test
+    public void testDetermineFirstFile() throws IOException {
+
+        Path path = helper.determineFirstFile(baseDir.getAbsolutePath());
+
+        assertNotNull(path);
+        assertEquals(testFile.getAbsolutePath(), path.toUri().toURL().getFile());
+
+    }
+
+    @Before
+    public void setUp() throws Exception {
+
+        if (baseDir == null) {
+
+            // we need this here for some strange reason while running ant test
+            // the FileSystem.get call in the LoadFuncHelper will try and
+            // connect to localhost??
+            PigServer pig = new PigServer(ExecType.LOCAL);
+
+            configuration = new Configuration(false);
+
+            configuration.set(LoadFuncHelper.FILE_EXTENSION_LOADERS,
+                    extensionLoaders);
+
+            helper = new LoadFuncHelper(configuration);
+
+            baseDir = new File("build/test/testLoadFuncHelper");
+            if (baseDir.exists()) {
+                FileUtil.fullyDelete(baseDir);
+            }
+
+            assertTrue(baseDir.mkdirs());
+
+            testFile = new File(baseDir, "testFile.txt");
+            FileWriter writer = new FileWriter(testFile);
+            try {
+                for (int i = 0; i < 100; i++) {
+                    writer.append("test test\n");
+                }
+            } finally {
+                writer.close();
+            }
+
+        }
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        FileUtil.fullyDelete(baseDir);
+        baseDir = null;
+    }
+
+}