You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2007/12/13 00:10:53 UTC

svn commit: r603773 [1/2] - in /incubator/pig/branches/plan: src/org/apache/pig/backend/ src/org/apache/pig/backend/datastorage/ src/org/apache/pig/backend/executionengine/ src/org/apache/pig/backend/hadoop/ src/org/apache/pig/backend/hadoop/datastorag...

Author: olga
Date: Wed Dec 12 15:10:51 2007
New Revision: 603773

URL: http://svn.apache.org/viewvc?rev=603773&view=rev
Log:
PIG-32: incremental changes to split logical and physical plan; second take

Added:
    incubator/pig/branches/plan/src/org/apache/pig/backend/
    incubator/pig/branches/plan/src/org/apache/pig/backend/datastorage/
    incubator/pig/branches/plan/src/org/apache/pig/backend/datastorage/DataStorage.java
    incubator/pig/branches/plan/src/org/apache/pig/backend/datastorage/DataStorageContainerDescriptor.java
    incubator/pig/branches/plan/src/org/apache/pig/backend/datastorage/DataStorageElementDescriptor.java
    incubator/pig/branches/plan/src/org/apache/pig/backend/datastorage/DataStorageException.java
    incubator/pig/branches/plan/src/org/apache/pig/backend/datastorage/ImmutableOutputStream.java
    incubator/pig/branches/plan/src/org/apache/pig/backend/datastorage/SeekableInputStream.java
    incubator/pig/branches/plan/src/org/apache/pig/backend/executionengine/
    incubator/pig/branches/plan/src/org/apache/pig/backend/executionengine/ExecutionEngine.java
    incubator/pig/branches/plan/src/org/apache/pig/backend/executionengine/ExecutionEngineException.java
    incubator/pig/branches/plan/src/org/apache/pig/backend/executionengine/ExecutionEngineLogicalOperator.java
    incubator/pig/branches/plan/src/org/apache/pig/backend/executionengine/ExecutionEngineLogicalPlan.java
    incubator/pig/branches/plan/src/org/apache/pig/backend/executionengine/ExecutionEngineNotificationEvent.java
    incubator/pig/branches/plan/src/org/apache/pig/backend/executionengine/ExecutionEnginePhysicalPlan.java
    incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/
    incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/datastorage/
    incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/datastorage/HConfiguration.java
    incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java
    incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/datastorage/HDirectory.java
    incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/datastorage/HFile.java
    incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/datastorage/HPath.java
    incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/datastorage/HSeekableInputStream.java
    incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/executionengine/
    incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
    incubator/pig/branches/plan/src/org/apache/pig/backend/local/
    incubator/pig/branches/plan/src/org/apache/pig/backend/local/datastorage/
    incubator/pig/branches/plan/src/org/apache/pig/backend/local/datastorage/LocalDataStorage.java
    incubator/pig/branches/plan/src/org/apache/pig/backend/local/datastorage/LocalDir.java
    incubator/pig/branches/plan/src/org/apache/pig/backend/local/datastorage/LocalFile.java
    incubator/pig/branches/plan/src/org/apache/pig/backend/local/datastorage/LocalPath.java
    incubator/pig/branches/plan/src/org/apache/pig/backend/local/datastorage/LocalSeekableInputStream.java
    incubator/pig/branches/plan/test/org/apache/pig/test/TestAbstractionHadoopDataStorage.java
    incubator/pig/branches/plan/test/org/apache/pig/test/TestAbstractionLocalDataStorage.java

Added: incubator/pig/branches/plan/src/org/apache/pig/backend/datastorage/DataStorage.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/plan/src/org/apache/pig/backend/datastorage/DataStorage.java?rev=603773&view=auto
==============================================================================
--- incubator/pig/branches/plan/src/org/apache/pig/backend/datastorage/DataStorage.java (added)
+++ incubator/pig/branches/plan/src/org/apache/pig/backend/datastorage/DataStorage.java Wed Dec 12 15:10:51 2007
@@ -0,0 +1,110 @@
+package org.apache.pig.backend.datastorage;
+
+import java.util.Properties;
+import java.io.IOException;
+
+public interface DataStorage {
+        
+        public static final String DEFAULT_REPLICATION_FACTOR_KEY = "default.replication.factor";
+        public static final String USED_BYTES_KEY = "used.bytes";
+
+        //
+        // TODO: more keys
+        //
+        
+        /**
+         * Place holder for possible initialization activities.
+         */
+        public void init();
+
+        /**
+         * Clean-up and releasing of resources.
+         */
+        public void close() throws IOException;
+        
+        /**
+         * Provides configuration information about the storage itself.
+         * For instance global data-replication policies if any, default
+         * values, ... Some of such values could be overridden at a finer 
+         * granularity (e.g. on a specific object in the Data Storage)
+         * 
+         * @return - configuration information
+         */
+        public Properties getConfiguration();
+        
+        /**
+         * Provides a way to change configuration parameters
+         * at the Data Storage level. For instance, change the 
+         * data replication policy.
+         * 
+         * @param newConfiguration - the new configuration settings
+         * @throws when configuration conflicts are detected
+         * 
+         */
+        public void updateConfiguration(Properties newConfiguration) 
+             throws DataStorageException;
+        
+        /**
+         * Provides statistics on the Storage: capacity values, how much 
+         * storage is in use...
+         * @return statistics on the Data Storage
+         */
+        public Properties getStatistics() throws IOException;
+                
+        /**
+         * Creates an entity handle for an object (no containment
+         * relation) from a String
+         *
+         * @param name of the object
+         * @return an object descriptor
+         * @throws DataStorageException if name does not conform to naming 
+         *         convention enforced by the Data Storage.
+         */
+        public DataStorageElementDescriptor asElement(String name) 
+            throws DataStorageException;
+
+        public DataStorageElementDescriptor asElement(DataStorageElementDescriptor element)
+            throws DataStorageException;
+        
+        public DataStorageElementDescriptor asElement(String parent,
+                                                      String child) 
+            throws DataStorageException;
+
+        public DataStorageElementDescriptor asElement(DataStorageContainerDescriptor parent,
+                                                      String child) 
+            throws DataStorageException;
+
+        public DataStorageElementDescriptor asElement(DataStorageContainerDescriptor parent,
+                                                      DataStorageElementDescriptor child)
+            throws DataStorageException;
+        
+        /**
+         * Created an entity handle for a container.
+         * 
+         * @param name of the container
+         * @return a container descriptor
+         * @throws DataStorageException if name does not conform to naming 
+         *         convention enforced by the Data Storage.
+         */
+        public DataStorageContainerDescriptor asContainer(String name) 
+            throws DataStorageException;
+        
+        public DataStorageContainerDescriptor asContainer(DataStorageContainerDescriptor container)
+            throws DataStorageException;
+        
+        public DataStorageContainerDescriptor asContainer(String parent,
+                                                          String child) 
+            throws DataStorageException;
+
+        public DataStorageContainerDescriptor asContainer(DataStorageContainerDescriptor parent,
+                                                          String child) 
+            throws DataStorageException;
+        
+        public DataStorageContainerDescriptor asContainer(DataStorageContainerDescriptor parent,
+                                                          DataStorageContainerDescriptor child) 
+            throws DataStorageException;
+
+        public void setActiveContainer(DataStorageContainerDescriptor container);
+        
+        public DataStorageContainerDescriptor getActiveContainer();
+}

Added: incubator/pig/branches/plan/src/org/apache/pig/backend/datastorage/DataStorageContainerDescriptor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/plan/src/org/apache/pig/backend/datastorage/DataStorageContainerDescriptor.java?rev=603773&view=auto
==============================================================================
--- incubator/pig/branches/plan/src/org/apache/pig/backend/datastorage/DataStorageContainerDescriptor.java (added)
+++ incubator/pig/branches/plan/src/org/apache/pig/backend/datastorage/DataStorageContainerDescriptor.java Wed Dec 12 15:10:51 2007
@@ -0,0 +1,7 @@
+
+package org.apache.pig.backend.datastorage;
+
+public interface DataStorageContainerDescriptor 
+                 extends DataStorageElementDescriptor, 
+                 Iterable<DataStorageElementDescriptor> {
+}

Added: incubator/pig/branches/plan/src/org/apache/pig/backend/datastorage/DataStorageElementDescriptor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/plan/src/org/apache/pig/backend/datastorage/DataStorageElementDescriptor.java?rev=603773&view=auto
==============================================================================
--- incubator/pig/branches/plan/src/org/apache/pig/backend/datastorage/DataStorageElementDescriptor.java (added)
+++ incubator/pig/branches/plan/src/org/apache/pig/backend/datastorage/DataStorageElementDescriptor.java Wed Dec 12 15:10:51 2007
@@ -0,0 +1,120 @@
+package org.apache.pig.backend.datastorage;
+
+import java.io.OutputStream;
+import java.io.InputStream;
+import java.io.IOException;
+
+import java.util.Properties;
+
+public interface DataStorageElementDescriptor extends 
+            Comparable<DataStorageElementDescriptor> {
+    
+        public static final String BLOCK_SIZE_KEY = "path.block.size";
+        public static final String BLOCK_REPLICATION_KEY = "path.block.replication";
+        public static final String LENGTH_KEY = "path.length";
+        public static final String MODIFICATION_TIME_KEY = "path.modification.time";
+        
+        //
+        // TODO: more keys
+        //
+        
+        public DataStorage getDataStorage();
+    
+        /**
+         * Opens a stream onto which an entity can be written to.
+         * 
+         * @param configuration information at the object level
+         * @return stream where to write
+         * @throws DataStorageException
+         */
+        public OutputStream create(Properties configuration) 
+             throws IOException;
+
+        public OutputStream create() 
+            throws IOException;
+
+        /**
+         * Copy entity from an existing one, possibly residing in a 
+         * different Data Storage.
+         * 
+         * @param dstName name of entity to create
+         * @param dstConfiguration configuration for the new entity
+         * @param removeSrc if src entity needs to be removed after copying it
+         * @throws DataStorageException for instance, configuration 
+         *         information for new entity is not compatible with 
+         *         configuration information at the Data
+         *         Storage level, user does not have privileges to read from
+         *         source entity or write to destination storage...
+         */
+        public void copy(DataStorageElementDescriptor dstName,
+                         Properties dstConfiguration,
+                         boolean removeSrc) 
+            throws IOException;
+        
+        public void copy(DataStorageElementDescriptor dstName,
+                         boolean removeSrc) 
+            throws IOException;
+                
+        /**
+         * Open for read a given entity
+         * 
+         * @return entity to read from
+         * @throws DataStorageExecption e.g. entity does not exist...
+         */
+        public InputStream open() throws IOException;
+
+        /**
+         * Open an element in the Data Storage with support for random access 
+         * (seek operations).
+         * 
+         * @return a seekable input stream
+         * @throws DataStorageException
+         */
+        public SeekableInputStream sopen() 
+             throws IOException;
+        
+        /**
+         * Checks whether the entity exists or not
+         * 
+         * @param name of entity
+         * @return true if entity exists, false otherwise.
+         */
+        public boolean exists() throws IOException;
+        
+        /**
+         * Changes the name of an entity in the Data Storage
+         * 
+         * @param newName new name of entity 
+         * @throws DataStorageException 
+         */
+        public void rename(DataStorageElementDescriptor newName) 
+             throws IOException;
+
+        /**
+         * Remove entity from the Data Storage.
+         * 
+         * @throws DataStorageException
+         */
+        public void delete() throws IOException;
+
+        /**
+         * Retrieve configuration information for entity
+         * @return configuration
+         */
+        public Properties getConfiguration() throws IOException;
+
+        /**
+         * Update configuration information for this entity
+         *
+         * @param newConfig configuration
+         * @throws DataStorageException
+         */
+        public void updateConfiguration(Properties newConfig) 
+             throws IOException;
+        
+        /**
+         * List entity statistics
+         * @return DataStorageProperties
+         */
+        public Properties getStatistics() throws IOException;
+}

Added: incubator/pig/branches/plan/src/org/apache/pig/backend/datastorage/DataStorageException.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/plan/src/org/apache/pig/backend/datastorage/DataStorageException.java?rev=603773&view=auto
==============================================================================
--- incubator/pig/branches/plan/src/org/apache/pig/backend/datastorage/DataStorageException.java (added)
+++ incubator/pig/branches/plan/src/org/apache/pig/backend/datastorage/DataStorageException.java Wed Dec 12 15:10:51 2007
@@ -0,0 +1,22 @@
+package org.apache.pig.backend.datastorage;
+
+public class DataStorageException extends Throwable {
+
+    static final long serialVersionUID = 1;
+    
+    public DataStorageException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public DataStorageException() {
+        this(null, null);
+    }
+    
+    public DataStorageException(String message) {
+        this(message, null);
+    }
+    
+    public DataStorageException(Throwable cause) {
+        this(null, cause);
+    }
+}

Added: incubator/pig/branches/plan/src/org/apache/pig/backend/datastorage/ImmutableOutputStream.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/plan/src/org/apache/pig/backend/datastorage/ImmutableOutputStream.java?rev=603773&view=auto
==============================================================================
--- incubator/pig/branches/plan/src/org/apache/pig/backend/datastorage/ImmutableOutputStream.java (added)
+++ incubator/pig/branches/plan/src/org/apache/pig/backend/datastorage/ImmutableOutputStream.java Wed Dec 12 15:10:51 2007
@@ -0,0 +1,18 @@
+package org.apache.pig.backend.datastorage;
+
+import java.io.OutputStream;
+import java.io.IOException;
+
+public class ImmutableOutputStream extends OutputStream {
+    
+    private String destination;
+    
+    public ImmutableOutputStream(String destination) {
+        super();
+        this.destination = destination;
+    }
+    
+    public void write(int b) throws IOException {
+        throw new IOException("Write not supported on " + destination);
+    }
+}

Added: incubator/pig/branches/plan/src/org/apache/pig/backend/datastorage/SeekableInputStream.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/plan/src/org/apache/pig/backend/datastorage/SeekableInputStream.java?rev=603773&view=auto
==============================================================================
--- incubator/pig/branches/plan/src/org/apache/pig/backend/datastorage/SeekableInputStream.java (added)
+++ incubator/pig/branches/plan/src/org/apache/pig/backend/datastorage/SeekableInputStream.java Wed Dec 12 15:10:51 2007
@@ -0,0 +1,33 @@
+package org.apache.pig.backend.datastorage;
+
+import java.io.InputStream;
+import java.io.IOException;
+
+public abstract class SeekableInputStream extends InputStream {
+    
+    public enum FLAGS {
+        SEEK_SET,
+        SEEK_CUR,
+        SEEK_END,
+    }
+    
+    /**
+     * Seeks to a given offset as specified by whence flags.
+     * If whence is SEEK_SET, offset is added to beginning of stream
+     * If whence is SEEK_CUR, offset is added to current position inside stream
+     * If whence is SEEK_END, offset is added to end of file position
+     * 
+     * @param offset
+     * @param whence
+     * @throws IOException
+     */
+    public abstract void seek(long offset, FLAGS whence) throws IOException;
+    
+    /**
+     * Returns current offset
+     * 
+     * @return offset
+     * @throws IOException
+     */
+    public abstract long tell() throws IOException;
+}

Added: incubator/pig/branches/plan/src/org/apache/pig/backend/executionengine/ExecutionEngine.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/plan/src/org/apache/pig/backend/executionengine/ExecutionEngine.java?rev=603773&view=auto
==============================================================================
--- incubator/pig/branches/plan/src/org/apache/pig/backend/executionengine/ExecutionEngine.java (added)
+++ incubator/pig/branches/plan/src/org/apache/pig/backend/executionengine/ExecutionEngine.java Wed Dec 12 15:10:51 2007
@@ -0,0 +1,76 @@
+package org.apache.pig.backend.executionengine;
+
+import java.util.Collection;
+import java.util.Properties;
+
+/**
+ * This is the main interface that various execution engines
+ * need to support and it is also the main interface that Pig
+ * will need to use to submit jobs for execution, retrieve information
+ * about their progress and possibly terminate them.
+ *
+ */
+public interface ExecutionEngine {
+    /**
+     * Place holder for possible initialization activities.
+     */
+    public void init();
+
+    /**
+     * Clean-up and releasing of resources.
+     */
+    public void close();
+
+        
+    /**
+     * Provides configuration information about the execution engine itself.
+     * 
+     * @return - information about the configuration used to connect to execution engine
+     */
+    public Properties getConfiguration();
+        
+    /**
+     * Provides a way to dynamically change configuration parameters
+     * at the Execution Engine level.
+     * 
+     * @param newConfiguration - the new configuration settings
+     * @throws when configuration conflicts are detected
+     * 
+     */
+    public void updateConfiguration(Properties newConfiguration) 
+        throws ExecutionEngineException;
+        
+    /**
+     * Provides statistics on the Execution Engine: number of nodes,
+     * node failure rates, average load, average job wait time...
+     * @return ExecutionEngineProperties
+     */
+    public Properties getStatistics() throws ExecutionEngineException;
+
+    /**
+     * Compiles a logical plan into a physical plan, given a set of configuration
+     * properties that apply at the plan-level. For instance desired degree of 
+     * parallelism for this plan, which could be different from the "default"
+     * one set at the execution engine level.
+     * 
+     * @param logical plan
+     * @param properties
+     * @return physical plan
+     */
+    public ExecutionEnginePhysicalPlan compile(ExecutionEngineLogicalPlan plan,
+                                               Properties properties)
+        throws ExecutionEngineException;
+        
+    /**
+     * This may be useful to support admin functionalities.
+     * 
+     * @return a collection of jobs "known" to the execution engine,
+     * say jobs currently queued up or running (this can be determined 
+     * by the obtaining the properties of the job)
+     * 
+     * @throws ExecutionEngineException maybe the user does not have privileges
+     * to obtain this information...
+     */
+    public Collection<ExecutionEnginePhysicalPlan> physicalPlans () 
+        throws ExecutionEngineException;
+}

Added: incubator/pig/branches/plan/src/org/apache/pig/backend/executionengine/ExecutionEngineException.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/plan/src/org/apache/pig/backend/executionengine/ExecutionEngineException.java?rev=603773&view=auto
==============================================================================
--- incubator/pig/branches/plan/src/org/apache/pig/backend/executionengine/ExecutionEngineException.java (added)
+++ incubator/pig/branches/plan/src/org/apache/pig/backend/executionengine/ExecutionEngineException.java Wed Dec 12 15:10:51 2007
@@ -0,0 +1,21 @@
+package org.apache.pig.backend.executionengine;
+
+public class ExecutionEngineException extends Throwable {
+    static final long serialVersionUID = 1;
+    
+    public ExecutionEngineException (String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public ExecutionEngineException() {
+        this(null, null);
+    }
+    
+    public ExecutionEngineException(String message) {
+        this(message, null);
+    }
+    
+    public ExecutionEngineException(Throwable cause) {
+        this(null, cause);
+    }
+}

Added: incubator/pig/branches/plan/src/org/apache/pig/backend/executionengine/ExecutionEngineLogicalOperator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/plan/src/org/apache/pig/backend/executionengine/ExecutionEngineLogicalOperator.java?rev=603773&view=auto
==============================================================================
--- incubator/pig/branches/plan/src/org/apache/pig/backend/executionengine/ExecutionEngineLogicalOperator.java (added)
+++ incubator/pig/branches/plan/src/org/apache/pig/backend/executionengine/ExecutionEngineLogicalOperator.java Wed Dec 12 15:10:51 2007
@@ -0,0 +1,12 @@
+package org.apache.pig.backend.executionengine;
+
+import java.io.Serializable;
+
+public interface ExecutionEngineLogicalOperator extends Serializable {
+
+    // catenation of group and name is a GUId for the operator
+    public String getScope();
+    public long getId();
+    
+    public ExecutionEngineLogicalOperator childAt(int position);
+}

Added: incubator/pig/branches/plan/src/org/apache/pig/backend/executionengine/ExecutionEngineLogicalPlan.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/plan/src/org/apache/pig/backend/executionengine/ExecutionEngineLogicalPlan.java?rev=603773&view=auto
==============================================================================
--- incubator/pig/branches/plan/src/org/apache/pig/backend/executionengine/ExecutionEngineLogicalPlan.java (added)
+++ incubator/pig/branches/plan/src/org/apache/pig/backend/executionengine/ExecutionEngineLogicalPlan.java Wed Dec 12 15:10:51 2007
@@ -0,0 +1,5 @@
+package org.apache.pig.backend.executionengine;
+
+public interface ExecutionEngineLogicalPlan {
+
+}

Added: incubator/pig/branches/plan/src/org/apache/pig/backend/executionengine/ExecutionEngineNotificationEvent.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/plan/src/org/apache/pig/backend/executionengine/ExecutionEngineNotificationEvent.java?rev=603773&view=auto
==============================================================================
--- incubator/pig/branches/plan/src/org/apache/pig/backend/executionengine/ExecutionEngineNotificationEvent.java (added)
+++ incubator/pig/branches/plan/src/org/apache/pig/backend/executionengine/ExecutionEngineNotificationEvent.java Wed Dec 12 15:10:51 2007
@@ -0,0 +1,5 @@
+package org.apache.pig.backend.executionengine;
+
+public interface ExecutionEngineNotificationEvent {
+
+}

Added: incubator/pig/branches/plan/src/org/apache/pig/backend/executionengine/ExecutionEnginePhysicalPlan.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/plan/src/org/apache/pig/backend/executionengine/ExecutionEnginePhysicalPlan.java?rev=603773&view=auto
==============================================================================
--- incubator/pig/branches/plan/src/org/apache/pig/backend/executionengine/ExecutionEnginePhysicalPlan.java (added)
+++ incubator/pig/branches/plan/src/org/apache/pig/backend/executionengine/ExecutionEnginePhysicalPlan.java Wed Dec 12 15:10:51 2007
@@ -0,0 +1,96 @@
+package org.apache.pig.backend.executionengine;
+
+import java.util.Properties;
+import java.util.Iterator;
+
+import org.apache.pig.data.Tuple;
+
+public interface ExecutionEnginePhysicalPlan {
+
+    /*
+     * As the fron end is not really supposed to do much with the physical representation
+     * the handle to the physical plan can just be an GUId in the back-end state, which
+     * brings up the problem is persistency...
+     */
+    
+    
+    /**
+     * Execute the physical plan.
+     * This is non-blocking. See getStatistics to pull information
+     * about the job.
+     * 
+     * @throws
+     */
+    public void execute() throws ExecutionEngineException;
+
+    public void submit() throws ExecutionEngineException;
+    
+    /**
+     * A job may have properties, like a priority, degree of parallelism...
+     * Some of such properties may be inherited from the ExecutionEngine
+     * configuration, other may have been set specifically for this job.
+     * For instance, a job scheduler may attribute low priority to
+     * jobs automatically started for maintenance purpose.
+     * 
+     * @return set of properties
+     */
+    public Properties getConfiguration();
+    
+    /**
+     * true is the physical plan has executed successfully and results are ready
+     * to be retireved
+     * 
+     * @return
+     * @throws ExecutionEngineException
+     */
+    public boolean hasExecuted() throws ExecutionEngineException;
+    
+    /**
+     * if query has executed successfully we want to retireve the results
+     * via iterating over them. 
+     * 
+     * @return
+     * @throws ExecutionEngineException
+     */
+    public Iterator<Tuple> getResults() throws ExecutionEngineException;
+    
+    /**
+     * Some properties of the job may be changed, say the priority...
+     * 
+     * @param configuration
+     * @throws some changes may not be allowed, for instance the some
+     * job-level properties cannot override Execution-Engine-level properties
+     * or maybe some properties can only be changes only in certain states of the
+     * job, say, once the job is started, parallelism level may not be changed...
+     */
+    public void updateConfiguration(Properties configuration)
+        throws ExecutionEngineException;
+    
+    /**
+     * Hook to provide asynchronous notifications.
+     * 
+     */
+    public void notify(ExecutionEngineNotificationEvent event);
+        
+    /**
+     * Kills current job.
+     * 
+     * @throws ExecutionEngineException
+     */
+    public void kill() throws ExecutionEngineException;
+        
+    /**
+     * Can be information about the state (not submitted, e.g. the execute method
+     * has not been called yet; not running, e.g. execute has been issued, 
+     * but job is waiting; running...; completed; aborted...; progress information
+     * 
+     * @return
+     */
+    public Properties getStatistics();
+    
+    /**    
+     * To provide an "explanation" about how the physical plan has been constructed
+     * 
+     */
+    public void explain();
+}

Added: incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/datastorage/HConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/datastorage/HConfiguration.java?rev=603773&view=auto
==============================================================================
--- incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/datastorage/HConfiguration.java (added)
+++ incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/datastorage/HConfiguration.java Wed Dec 12 15:10:51 2007
@@ -0,0 +1,52 @@
+package org.apache.pig.backend.hadoop.datastorage;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Enumeration;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.Properties;
+
+
+public class HConfiguration extends Properties {
+    
+    private static final long serialVersionUID = 1L;
+        
+    public HConfiguration() {
+    }
+
+    //
+    // TODO: this implementation has a problem: it does not
+    //       respect final attributes
+    //
+    public HConfiguration(Configuration other) {
+        if (other != null) {
+            Iterator<Map.Entry<String, String>> iter = other.iterator();
+            
+            while (iter.hasNext()) {
+                Map.Entry<String, String> entry = iter.next();
+                
+                put(entry.getKey(), entry.getValue());
+            }
+        }
+    }
+        
+    public Configuration getConfiguration() {
+        Configuration config = new Configuration();
+
+        Enumeration<Object> iter = keys();
+        
+        while (iter.hasMoreElements()) {
+            String key = (String) iter.nextElement();
+            String val = getProperty(key);
+           
+            config.set(key, val);
+        }
+
+        return config;
+    }
+}
+
+
+

Added: incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java?rev=603773&view=auto
==============================================================================
--- incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java (added)
+++ incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java Wed Dec 12 15:10:51 2007
@@ -0,0 +1,162 @@
+package org.apache.pig.backend.hadoop.datastorage;
+
+import java.net.URI;
+import java.io.IOException;
+import java.util.Properties;
+import java.util.Enumeration;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.pig.backend.datastorage.*;
+
+
+public class HDataStorage implements DataStorage {
+        
+    private FileSystem fs;
+    
+    public HDataStorage(URI uri, Configuration conf) throws IOException {
+        this(uri, new HConfiguration(conf));
+    }
+    
+    public HDataStorage(URI uri, HConfiguration conf) throws IOException {
+        fs = FileSystem.get(uri, conf.getConfiguration());
+    }
+
+    public HDataStorage(Configuration conf) throws IOException {
+        this(new HConfiguration(conf));
+    }
+    
+    public HDataStorage(HConfiguration conf) throws IOException {
+        fs = FileSystem.get(conf.getConfiguration());
+    }
+
+    @Override
+    public void init() { }
+    
+    @Override
+    public void close() throws IOException {
+        fs.close();
+    }
+    
+    @Override
+    public Properties getConfiguration() {
+        Properties props = new HConfiguration(fs.getConf());
+                
+        short defaultReplication = fs.getDefaultReplication();
+        props.setProperty(DEFAULT_REPLICATION_FACTOR_KEY,
+                          (new Short(defaultReplication)).toString());
+        
+        return props;
+    }
+    
+    @Override
+    public void updateConfiguration(Properties newConfiguration) 
+            throws DataStorageException {        
+        if (newConfiguration == null) {
+            return;
+        }
+        
+        Enumeration<Object> newKeys = newConfiguration.keys();
+        
+        while (newKeys.hasMoreElements()) {
+            String key = (String) newKeys.nextElement();
+            String value = null;
+            
+            value = newConfiguration.getProperty(key);
+            
+            fs.getConf().set(key,value);
+        }
+    }
+    
+    @Override
+    public Properties getStatistics() throws IOException {
+        Properties stats = new Properties();
+        long bytes = fs.getUsed();
+        
+        stats.setProperty(USED_BYTES_KEY , new Long(bytes).toString());
+        
+        return stats;
+    }
+    
+    @Override
+    public DataStorageElementDescriptor asElement(String name) 
+            throws DataStorageException {
+        return new HFile(this, name);
+    }
+    
+    @Override
+    public DataStorageElementDescriptor asElement(DataStorageElementDescriptor element)
+            throws DataStorageException {
+        return new HFile(this, element.toString());
+    }
+    
+    @Override
+    public DataStorageElementDescriptor asElement(String parent,
+                                                  String child) 
+            throws DataStorageException {
+        return new HFile(this, parent, child);
+    }
+
+    @Override
+    public DataStorageElementDescriptor asElement(DataStorageContainerDescriptor parent,
+                                                  String child) 
+            throws DataStorageException {
+        return new HFile(this, parent.toString(), child);
+    }
+
+    @Override
+    public DataStorageElementDescriptor asElement(DataStorageContainerDescriptor parent,
+                                                  DataStorageElementDescriptor child) 
+            throws DataStorageException {
+        return new HFile(this, parent.toString(), child.toString());
+    }
+
+    @Override
+    public DataStorageContainerDescriptor asContainer(String name) 
+            throws DataStorageException {
+        return new HDirectory(this, name);
+    }
+    
+    @Override
+    public DataStorageContainerDescriptor asContainer(DataStorageContainerDescriptor container)
+            throws DataStorageException {
+        return new HDirectory(this, container.toString());
+    }
+    
+    @Override
+    public DataStorageContainerDescriptor asContainer(String parent,
+                                                      String child) 
+            throws DataStorageException {
+        return new HDirectory(this, parent, child);
+    }
+
+    @Override
+    public DataStorageContainerDescriptor asContainer(DataStorageContainerDescriptor parent,
+                                                      String child) 
+            throws DataStorageException {
+        return new HDirectory(this, parent.toString(), child);
+    }
+    
+    @Override
+    public DataStorageContainerDescriptor asContainer(DataStorageContainerDescriptor parent,
+                                                      DataStorageContainerDescriptor child)
+            throws DataStorageException {
+        return new HDirectory(this, parent.toString(), child.toString());
+    }
+    
+    @Override
+    public void setActiveContainer(DataStorageContainerDescriptor container) {
+        fs.setWorkingDirectory(new Path(container.toString()));
+    }
+    
+    @Override
+    public DataStorageContainerDescriptor getActiveContainer() {
+        return new HDirectory(this, fs.getWorkingDirectory());
+    }
+
+    public FileSystem getHFS() {
+        return fs;
+    }
+}

Added: incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/datastorage/HDirectory.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/datastorage/HDirectory.java?rev=603773&view=auto
==============================================================================
--- incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/datastorage/HDirectory.java (added)
+++ incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/datastorage/HDirectory.java Wed Dec 12 15:10:51 2007
@@ -0,0 +1,148 @@
+package org.apache.pig.backend.hadoop.datastorage;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Properties;
+
+import org.apache.hadoop.fs.Path;
+
+import org.apache.pig.backend.datastorage.*;
+
+public class HDirectory extends HPath
+                        implements DataStorageContainerDescriptor {
+
+    public HDirectory(HDataStorage fs, Path parent, Path child) {
+        super(fs, parent, child);
+    }
+
+    public HDirectory(HDataStorage fs, String parent, String child) {
+        super(fs, parent, child);
+    }
+    
+    public HDirectory(HDataStorage fs, Path parent, String child) {
+        super(fs, parent, child);
+    }
+
+    public HDirectory(HDataStorage fs, String parent, Path child) {
+        super(fs, parent, child);
+    }
+        
+    public HDirectory(HDataStorage fs, Path path) {
+        super(fs, path);
+    }
+    
+    public HDirectory(HDataStorage fs, String pathString) {
+        super(fs, pathString);
+    }
+
+    @Override
+    public OutputStream create(Properties configuration) 
+            throws IOException {
+        fs.getHFS().mkdirs(path);
+        
+        return new ImmutableOutputStream(path.toString());
+    }
+    
+    @Override
+    public void copy(DataStorageElementDescriptor dstName,
+                     Properties dstConfiguration,
+                     boolean removeSrc)
+            throws IOException {
+        copy((DataStorageContainerDescriptor) dstName,
+             dstConfiguration,
+             removeSrc);
+    }
+    
+    
+    public void copy(DataStorageContainerDescriptor dstName,
+                     Properties dstConfiguration,
+                     boolean removeSrc)
+            throws IOException {
+        if (dstName == null) {
+            return;
+        }
+                    
+        if (!exists()) {
+            throw new IOException("Source does not exist " +
+                                  this);
+        }
+            
+        if (dstName.exists()) {
+            throw new IOException("Destination already exists " +
+                                  dstName);
+        }
+        
+        dstName.create();
+        
+        Iterator<DataStorageElementDescriptor> elems = iterator();
+    
+        try {
+            while (elems.hasNext()) {
+                DataStorageElementDescriptor curElem = elems.next();
+            
+                if (curElem instanceof DataStorageContainerDescriptor) {
+                    DataStorageContainerDescriptor dst =
+                        dstName.getDataStorage().asContainer(dstName,
+                                                             ((HPath)curElem).getPath().getName());
+                    
+                    curElem.copy(dst, dstConfiguration, removeSrc);
+                    
+                    if (removeSrc) {
+                        curElem.delete();
+                    }
+                }
+                else {
+                    DataStorageElementDescriptor dst = 
+                        dstName.getDataStorage().asElement(dstName,
+                                                           ((HPath)curElem).getPath().getName());
+                    
+                    curElem.copy(dst, dstConfiguration, removeSrc);
+                }
+            }
+        }
+        catch (DataStorageException e) {
+            throw new IOException("Failed to copy " + this + " to " + dstName, e);
+        }
+
+        if (removeSrc) {
+            delete();
+        }
+    }
+
+    @Override
+    public InputStream open() throws IOException {
+        throw new IOException("Cannot open dir " + path);
+    }
+
+    @Override
+    public SeekableInputStream sopen() throws IOException {
+        throw new IOException("Cannot sopen dir " + path);
+    }
+
+    public Iterator<DataStorageElementDescriptor> iterator() {
+        LinkedList<DataStorageElementDescriptor> elements =
+            new LinkedList<DataStorageElementDescriptor>();
+        
+        try {
+            Path[] paths = fs.getHFS().listPaths(new Path[]{path});
+            
+            for (Path p : paths) {
+                if (fs.getHFS().isFile(p)) {
+                    elements.add(fs.asElement(p.toString()));
+                }
+                else {
+                    elements.add(fs.asContainer(p.toString()));                    
+                }
+            }
+        }
+        catch (IOException e) {
+        }
+        catch (DataStorageException e) {
+        }
+        
+        return elements.iterator();
+    }
+}

Added: incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/datastorage/HFile.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/datastorage/HFile.java?rev=603773&view=auto
==============================================================================
--- incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/datastorage/HFile.java (added)
+++ incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/datastorage/HFile.java Wed Dec 12 15:10:51 2007
@@ -0,0 +1,105 @@
+package org.apache.pig.backend.hadoop.datastorage;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Properties;
+
+import org.apache.hadoop.fs.Path;
+
+import org.apache.pig.backend.datastorage.DataStorageElementDescriptor;
+import org.apache.pig.backend.datastorage.DataStorageContainerDescriptor;
+import org.apache.pig.backend.datastorage.DataStorageException;
+import org.apache.pig.backend.datastorage.SeekableInputStream;
+
+public class HFile extends HPath {
+    
+    public HFile(HDataStorage fs, Path parent, Path child) {
+        super(fs, parent, child);
+    }
+
+    public HFile(HDataStorage fs, String parent, String child) {
+        super(fs, parent, child);
+    }
+    
+    public HFile(HDataStorage fs, Path parent, String child) {
+        super(fs, parent, child);
+    }
+
+    public HFile(HDataStorage fs, String parent, Path child) {
+        super(fs, parent, child);
+    }
+        
+    public HFile(HDataStorage fs, String pathString) {
+        super(fs, pathString);
+    }
+        
+    public HFile(HDataStorage fs, Path path) {
+        super(fs, path);
+    }
+    
+    @Override
+    public OutputStream create(Properties configuration) 
+             throws IOException {
+        return fs.getHFS().create(path, false);
+    }
+    
+    @Override
+    public void copy(DataStorageElementDescriptor dstName,
+                     Properties dstConfiguration,
+                     boolean removeSrc) 
+            throws IOException {
+        if (dstName == null) {
+            return;
+        }
+        
+        if (!exists()) {
+            throw new IOException("Source does not exist " +
+                                  this);
+        }
+
+        if (dstName.exists()) {
+            if (dstName instanceof DataStorageContainerDescriptor) {
+                try {
+                    dstName = dstName.getDataStorage().
+                                      asElement((DataStorageContainerDescriptor) dstName,
+                                                getPath().getName());
+                }
+                catch (DataStorageException e) {
+                    throw new IOException("Unable to generate element name (src: " + 
+                                           this + ", dst: " + dstName + ")",
+                                          e);
+                }
+            }
+        }
+        
+        InputStream in = null;
+        OutputStream out = null;
+        
+        in = this.open();
+        out = dstName.create(dstConfiguration);
+            
+        byte[] data = new byte[4 * 1024];
+        int bc;
+        while((bc = in.read(data)) != -1) {
+            out.write(data, 0, bc);
+        }
+        
+        out.close();
+            
+        if (removeSrc) {
+            delete();
+        }
+    }
+    
+    @Override
+    public InputStream open() throws IOException {
+        return fs.getHFS().open(path);
+    }
+
+    @Override
+    public SeekableInputStream sopen() throws IOException {
+        return new HSeekableInputStream(fs.getHFS().open(path),
+                                        fs.getHFS().getContentLength(path));
+    }
+}

Added: incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/datastorage/HPath.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/datastorage/HPath.java?rev=603773&view=auto
==============================================================================
--- incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/datastorage/HPath.java (added)
+++ incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/datastorage/HPath.java Wed Dec 12 15:10:51 2007
@@ -0,0 +1,153 @@
+
+package org.apache.pig.backend.hadoop.datastorage;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+
+import org.apache.pig.backend.datastorage.*;
+
+public abstract class HPath implements DataStorageElementDescriptor {
+
+    protected Path path;
+    protected HDataStorage fs;
+
+    public HPath(HDataStorage fs, Path parent, Path child) {
+        this.path = new Path(parent, child);
+        this.fs = fs;
+    }
+
+    public HPath(HDataStorage fs, String parent, String child) {
+        this(fs, new Path(parent), new Path(child));
+    }
+    
+    public HPath(HDataStorage fs, Path parent, String child) {
+        this(fs, parent, new Path(child));
+    }
+
+    public HPath(HDataStorage fs, String parent, Path child) {
+        this(fs, new Path(parent), child);
+    }
+        
+    public HPath(HDataStorage fs, String pathString) {
+        this(fs, new Path(pathString));
+    }
+        
+    public HPath(HDataStorage fs, Path path) {
+        this.path = path;
+        this.fs = fs;
+    }
+
+    @Override
+    public DataStorage getDataStorage() {
+        return fs;
+    }
+    
+    @Override
+    public abstract OutputStream create(Properties configuration) 
+             throws IOException;
+    
+    @Override
+    public abstract void copy(DataStorageElementDescriptor dstName,
+                              Properties dstConfiguration,
+                              boolean removeSrc)
+        throws IOException;
+    
+    @Override
+    public abstract InputStream open() throws IOException;
+
+    @Override
+    public abstract SeekableInputStream sopen() throws IOException;
+
+    @Override
+    public boolean exists() throws IOException {
+        return fs.getHFS().exists(path);
+    }
+    
+    @Override
+    public void rename(DataStorageElementDescriptor newName) 
+             throws IOException {
+        if (newName != null) {
+            fs.getHFS().rename(path, ((HPath)newName).path);
+        }
+    }
+
+    @Override
+    public void delete() throws IOException {
+        fs.getHFS().delete(path);
+    }
+
+    @Override
+    public Properties getConfiguration() throws IOException {
+        HConfiguration props = new HConfiguration();
+
+        long blockSize = fs.getHFS().getFileStatus(path).getBlockSize();
+
+        short replication = fs.getHFS().getFileStatus(path).getReplication();
+        
+        props.setProperty(BLOCK_SIZE_KEY, (new Long(blockSize)).toString());
+        props.setProperty(BLOCK_REPLICATION_KEY, (new Short(replication)).toString());
+        
+        return props;
+    }
+
+    @Override
+    public void updateConfiguration(Properties newConfig) throws IOException {
+        if (newConfig == null) {
+            return;
+        }
+        
+        String blkReplStr = newConfig.getProperty(BLOCK_REPLICATION_KEY);
+        
+        fs.getHFS().setReplication(path, 
+                                   new Short(blkReplStr).shortValue());    
+    }
+
+    @Override
+    public Properties getStatistics() throws IOException {
+        HConfiguration props = new HConfiguration();
+        
+        Long length = new Long(fs.getHFS().getFileStatus(path).getLen());
+
+        Long modificationTime = new Long(fs.getHFS().getFileStatus(path).
+                                         getModificationTime());
+
+        props.setProperty(LENGTH_KEY, length.toString());
+        props.setProperty(MODIFICATION_TIME_KEY, modificationTime.toString());
+        
+        return props;
+    }
+
+    @Override
+    public int compareTo(DataStorageElementDescriptor other) {
+        return path.compareTo(other);
+    }
+
+    @Override
+    public OutputStream create() throws IOException {
+        return create(null);
+    }
+
+    @Override
+    public void copy(DataStorageElementDescriptor dstName,
+                     boolean removeSrc) 
+            throws IOException {
+        copy(dstName, null, removeSrc);
+    }
+    
+    public Path getPath() {
+        return path;
+    }
+    
+    public FileSystem getHFS() {
+        return fs.getHFS();
+    }
+    
+    public String toString() {
+        return path.toString();
+    }
+}

Added: incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/datastorage/HSeekableInputStream.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/datastorage/HSeekableInputStream.java?rev=603773&view=auto
==============================================================================
--- incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/datastorage/HSeekableInputStream.java (added)
+++ incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/datastorage/HSeekableInputStream.java Wed Dec 12 15:10:51 2007
@@ -0,0 +1,94 @@
+package org.apache.pig.backend.hadoop.datastorage;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+
+import org.apache.pig.backend.datastorage.SeekableInputStream;
+
+public class HSeekableInputStream extends SeekableInputStream {
+
+    protected FSDataInputStream input;
+    protected long contentLength;
+    
+    HSeekableInputStream(FSDataInputStream input,
+                         long contentLength) {
+        this.input = input;
+        this.contentLength = contentLength;
+    }
+    
+    @Override
+    public void seek(long offset, FLAGS whence) throws IOException {
+        long targetPos;
+        
+        switch (whence) {
+        case SEEK_SET: {
+            targetPos = offset;
+            break;
+        }
+        case SEEK_CUR: {
+            targetPos = input.getPos() + offset;
+            break;
+        }
+        case SEEK_END: {
+            targetPos = contentLength + offset;
+            break;
+        }
+        default: {
+            throw new IOException("Invalid seek option: " + whence);
+        }
+        }
+        
+        input.seek(targetPos);
+    }
+    
+    @Override
+    public long tell() throws IOException {
+        return input.getPos();
+    }
+    
+    @Override
+    public int read() throws IOException {
+        return input.read();
+    }
+    
+    @Override
+    public int read(byte[] b) throws IOException {
+        return input.read(b);
+    }
+        
+    @Override
+    public int read(byte[] b, int off, int len ) throws IOException {
+        return input.read(b, off, len);
+    }
+    
+    @Override
+    public int available() throws IOException {
+        return input.available();
+    }
+    
+    @Override
+    public long skip(long n) throws IOException {
+        return input.skip(n);
+    }
+    
+    @Override
+    public void close() throws IOException {
+        input.close();
+    }
+    
+    @Override
+    public void mark(int readlimit) {
+        input.mark(readlimit);
+    }
+    
+    @Override
+    public void reset() throws IOException {
+        input.reset();
+    }
+    
+    @Override
+    public boolean markSupported() {
+        return input.markSupported();
+    }
+}

Added: incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=603773&view=auto
==============================================================================
--- incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (added)
+++ incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Wed Dec 12 15:10:51 2007
@@ -0,0 +1,68 @@
+package org.apache.pig.backend.hadoop.executionengine;
+
+import java.util.Collection;
+import java.util.Properties;
+import java.util.Map;
+import java.util.HashMap;
+
+import org.apache.pig.backend.executionengine.ExecutionEngine;
+import org.apache.pig.backend.executionengine.ExecutionEngineException;
+import org.apache.pig.backend.executionengine.ExecutionEngineLogicalPlan;
+import org.apache.pig.backend.executionengine.ExecutionEnginePhysicalPlan;
+
+import org.apache.pig.impl.physicalLayer.IntermedResult;
+
+public class HExecutionEngine implements ExecutionEngine {
+
+    protected Map<String, IntermedResult> aliases;
+    
+    public HExecutionEngine() {
+        aliases = new HashMap<String, IntermedResult>();
+    }
+    
+    public void init() {
+        // nothing to do
+    }
+
+    public void close() {
+        // TODO:
+        // check for running jobs and kill them all?
+    }
+
+    public Properties getConfiguration() {
+        // TODO
+        return null;
+    }
+        
+    public void updateConfiguration(Properties newConfiguration) 
+        throws ExecutionEngineException {
+        // TODO
+    }
+        
+    public Properties getStatistics() throws ExecutionEngineException {
+        // TODO
+        Properties stats = new Properties();
+        return stats;
+    }
+
+    public ExecutionEnginePhysicalPlan compile(ExecutionEngineLogicalPlan plan,
+                                               Properties properties)
+            throws ExecutionEngineException {
+        // TODO
+        // first step is to visit the logical plan:
+        //      if LOSplit: need to add more aliases in the table
+        //
+        // compilation process uses the alias table
+        //
+        // emit the physical representation, instead of finding the intermed results in LORead
+        // we need to look into the table.
+        //
+        return null;
+    }
+    
+    public Collection<ExecutionEnginePhysicalPlan> physicalPlans () 
+        throws ExecutionEngineException {
+        // TODO
+        return null;
+    }
+}

Added: incubator/pig/branches/plan/src/org/apache/pig/backend/local/datastorage/LocalDataStorage.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/plan/src/org/apache/pig/backend/local/datastorage/LocalDataStorage.java?rev=603773&view=auto
==============================================================================
--- incubator/pig/branches/plan/src/org/apache/pig/backend/local/datastorage/LocalDataStorage.java (added)
+++ incubator/pig/branches/plan/src/org/apache/pig/backend/local/datastorage/LocalDataStorage.java Wed Dec 12 15:10:51 2007
@@ -0,0 +1,131 @@
+package org.apache.pig.backend.local.datastorage;
+
+import java.io.IOException;
+import java.io.File;
+import java.util.Properties;
+
+import org.apache.pig.backend.datastorage.*;
+
+public class LocalDataStorage implements DataStorage {
+
+    protected File workingDir;
+    
+    public LocalDataStorage() {
+        workingDir = new File(System.getProperty("user.dir"));
+    }
+    
+    @Override
+    public void init() {
+        ;
+    }
+
+    @Override
+    public void close() throws IOException {
+        ;
+    }
+    
+    @Override
+    public Properties getConfiguration() {
+        Properties config = new Properties();
+        
+        config.put(DEFAULT_REPLICATION_FACTOR_KEY, (new Integer(1)).toString());
+        
+        return config;
+    }
+    
+    @Override
+    public void updateConfiguration(Properties newConfiguration) 
+         throws DataStorageException {
+        ;
+    }
+    
+    @Override
+    public Properties getStatistics() throws IOException {
+        Properties stats = new Properties();
+        
+        //TODO determine used bytes in order to set the value of the
+        // key USED_BYTES_KEY
+        
+        return stats;
+    }
+            
+    @Override
+    public LocalFile asElement(String name) 
+            throws DataStorageException {
+        return new LocalFile(this, name);
+    }
+
+    @Override
+    public LocalFile asElement(DataStorageElementDescriptor element)
+            throws DataStorageException {
+        return new LocalFile(this, element.toString());
+    }
+    
+    @Override
+    public LocalFile asElement(String parent,
+                               String child) 
+            throws DataStorageException {
+        return new LocalFile(this,parent, child);
+    }
+
+    @Override
+    public LocalFile asElement(DataStorageContainerDescriptor parent,
+                               String child) 
+            throws DataStorageException {
+        return new LocalFile(this, parent.toString(), child);
+    }
+
+    @Override
+    public LocalFile asElement(DataStorageContainerDescriptor parent,
+                               DataStorageElementDescriptor child)
+            throws DataStorageException {
+        return new LocalFile(this, parent.toString(), child.toString());
+    }
+    
+    @Override
+    public LocalDir asContainer(String name) 
+            throws DataStorageException {
+        return new LocalDir(this, name);
+    }
+    
+    @Override
+    public LocalDir asContainer(DataStorageContainerDescriptor container)
+            throws DataStorageException {
+        return new LocalDir(this, container.toString());
+    }
+    
+    @Override
+    public LocalDir asContainer(String parent,
+                                String child) 
+            throws DataStorageException {
+        return new LocalDir(this, parent, child);
+    }
+
+    @Override
+    public LocalDir asContainer(DataStorageContainerDescriptor parent,
+                                String child) 
+            throws DataStorageException {
+        return new LocalDir(this, parent.toString(), child);
+    }
+    
+    @Override
+    public LocalDir asContainer(DataStorageContainerDescriptor parent,
+                                DataStorageContainerDescriptor child) 
+        throws DataStorageException {
+        return new LocalDir(this, parent.toString(), child.toString());
+    }
+
+    @Override
+    public void setActiveContainer(DataStorageContainerDescriptor container) {
+    	this.workingDir = new File(container.toString());
+    }
+    
+    @Override
+    public DataStorageContainerDescriptor getActiveContainer() {
+        return new LocalDir(this, this.workingDir.getPath());
+    }
+    
+    public File getWorkingDir() {
+        return this.workingDir;
+    }
+}

Added: incubator/pig/branches/plan/src/org/apache/pig/backend/local/datastorage/LocalDir.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/plan/src/org/apache/pig/backend/local/datastorage/LocalDir.java?rev=603773&view=auto
==============================================================================
--- incubator/pig/branches/plan/src/org/apache/pig/backend/local/datastorage/LocalDir.java (added)
+++ incubator/pig/branches/plan/src/org/apache/pig/backend/local/datastorage/LocalDir.java Wed Dec 12 15:10:51 2007
@@ -0,0 +1,154 @@
+package org.apache.pig.backend.local.datastorage;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.LinkedList;
+import java.util.Properties;
+import java.util.Iterator;
+
+import org.apache.pig.backend.datastorage.DataStorageElementDescriptor;
+import org.apache.pig.backend.datastorage.DataStorageContainerDescriptor;
+import org.apache.pig.backend.datastorage.DataStorageException;
+import org.apache.pig.backend.datastorage.ImmutableOutputStream;
+import org.apache.pig.backend.datastorage.SeekableInputStream;
+
+public class LocalDir extends LocalPath
+                      implements DataStorageContainerDescriptor {
+
+    public LocalDir(LocalDataStorage fs, String path) {
+        super(fs, path);
+    }
+    
+    public LocalDir(LocalDataStorage fs, File path) {
+        super(fs, path);
+    }
+
+    public LocalDir(LocalDataStorage fs, String parent, String child) {
+        super(fs, parent, child);
+    }
+    
+    public LocalDir(LocalDataStorage fs, File parent, File child) {
+        super(fs,
+              parent.getPath(),
+              child.getPath());
+    }
+    
+    public LocalDir(LocalDataStorage fs, File parent, String child) {
+        this(fs, parent.getPath(), child);
+    }
+    
+    public LocalDir(LocalDataStorage fs, String parent, File child) {
+        this(fs, parent, child.getPath());
+    }
+    
+    @Override
+    public OutputStream create(Properties configuration) 
+            throws IOException {
+        if (! getCurPath().mkdirs()) {
+            throw new IOException("Unable to create dirs for: " + this.path);
+        }
+        
+        return new ImmutableOutputStream(path.toString());
+    }
+
+    @Override
+    public void copy(DataStorageElementDescriptor dstName,
+                     Properties dstConfiguration,
+                     boolean removeSrc) 
+            throws IOException {
+        copy((DataStorageContainerDescriptor) dstName,
+                dstConfiguration,
+                removeSrc);
+       }
+       
+       
+    public void copy(DataStorageContainerDescriptor dstName,
+                     Properties dstConfiguration,
+                     boolean removeSrc)
+               throws IOException {
+        if (dstName == null) {
+            return;
+        }
+                       
+        if (!exists()) {
+            throw new IOException("Source does not exist " +
+                                  this);
+        }
+               
+        if (dstName.exists()) {
+            throw new IOException("Destination already exists " +
+                                  dstName);
+        }
+           
+        dstName.create();
+           
+        Iterator<DataStorageElementDescriptor> elems = iterator();
+       
+        try {
+            while (elems.hasNext()) {
+                DataStorageElementDescriptor curElem = elems.next();
+               
+                if (curElem instanceof DataStorageContainerDescriptor) {
+                    DataStorageContainerDescriptor dst =
+                        dstName.getDataStorage().asContainer(dstName,
+                                                             ((LocalPath)curElem).getPath().getName());
+                       
+                    curElem.copy(dst, dstConfiguration, removeSrc);
+                       
+                    if (removeSrc) {
+                        curElem.delete();
+                    }
+                }
+                else {
+                    DataStorageElementDescriptor dst = 
+                        dstName.getDataStorage().asElement(dstName,
+                                                 ((LocalPath)curElem).getPath().getName());
+                       
+                    curElem.copy(dst, dstConfiguration, removeSrc);
+                }
+            }
+        }
+        catch (DataStorageException e) {
+            throw new IOException("Failed to copy " + this + " to " + dstName, e);
+        }
+
+        if (removeSrc) {
+            delete();
+        }
+    }
+
+    @Override
+    public InputStream open() throws IOException {
+        throw new IOException("Cannot open dir " + path);
+    }
+
+    @Override
+    public SeekableInputStream sopen() throws IOException {
+        throw new IOException("Cannot sopen dir " + path);
+    }
+
+    @Override
+    public Iterator<DataStorageElementDescriptor> iterator() {
+        LinkedList<DataStorageElementDescriptor> elements =
+            new LinkedList<DataStorageElementDescriptor>();
+        
+        try {
+            File[] files = getCurPath().listFiles();
+            
+            for (File f : files) {
+                if (f.isFile()) {
+                    elements.add(fs.asElement(f.getPath()));
+                }
+                else {
+                    elements.add(fs.asContainer(f.getPath()));                    
+                }
+            }
+        }
+        catch (DataStorageException e) {
+        }
+        
+        return elements.iterator();
+    }
+}

Added: incubator/pig/branches/plan/src/org/apache/pig/backend/local/datastorage/LocalFile.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/plan/src/org/apache/pig/backend/local/datastorage/LocalFile.java?rev=603773&view=auto
==============================================================================
--- incubator/pig/branches/plan/src/org/apache/pig/backend/local/datastorage/LocalFile.java (added)
+++ incubator/pig/branches/plan/src/org/apache/pig/backend/local/datastorage/LocalFile.java Wed Dec 12 15:10:51 2007
@@ -0,0 +1,115 @@
+
+package org.apache.pig.backend.local.datastorage;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.FileNotFoundException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.FileOutputStream;
+import java.io.FileInputStream;
+import java.util.Properties;
+
+import org.apache.pig.backend.datastorage.*;
+
+public class LocalFile extends LocalPath {
+
+    public LocalFile(LocalDataStorage fs, String path) {
+        super(fs, path);
+    }
+    
+    public LocalFile(LocalDataStorage fs, File path) {
+        super(fs, path);
+    }
+
+    public LocalFile(LocalDataStorage fs, String parent, String child) {
+        super(fs, parent, child);
+    }
+    
+    public LocalFile(LocalDataStorage fs, File parent, File child) {
+        super(fs,
+              parent.getPath(),
+              child.getPath());
+    }
+    
+    public LocalFile(LocalDataStorage fs, File parent, String child) {
+        this(fs, parent.getPath(), child);
+    }
+    
+    public LocalFile(LocalDataStorage fs, String parent, File child) {
+        this(fs, parent, child.getPath());
+    }
+        
+    @Override
+    public OutputStream create(Properties configuration) 
+            throws IOException {
+        if (! getCurPath().createNewFile()) {
+            throw new IOException("Failed to create file " + this.path);
+        }
+        
+        return new FileOutputStream(getCurPath());
+    }    
+    
+    @Override
+    public void copy(DataStorageElementDescriptor dstName,
+                     Properties dstConfiguration,
+            boolean removeSrc) 
+            throws IOException {
+        if (dstName == null) {
+            return;
+        }
+        
+        if (!exists()) {
+            throw new IOException("Source does not exist " +
+                                  this);
+        }
+
+        if (dstName.exists()) {
+            if (dstName instanceof DataStorageContainerDescriptor) {
+                try {
+                    dstName = dstName.getDataStorage().
+                                      asElement((DataStorageContainerDescriptor) dstName,
+                                                path.getName());
+                }
+                catch (DataStorageException e) {
+                    throw new IOException("Unable to generate element name (src: " + 
+                                           this + ", dst: " + dstName + ")",
+                                          e);
+                }
+            }
+        }
+        
+        InputStream in = null;
+        OutputStream out = null;
+        
+        in = this.open();
+        out = dstName.create(dstConfiguration);
+            
+        byte[] data = new byte[4 * 1024];
+        int bc;
+        while((bc = in.read(data)) != -1) {
+            out.write(data, 0, bc);
+        }
+        
+        out.close();
+            
+        if (removeSrc) {
+            delete();
+        }
+    }    
+
+    @Override
+    public InputStream open () throws IOException {
+        return new FileInputStream(this.path);
+    }
+    
+    @Override
+    public SeekableInputStream sopen() throws IOException {
+        try {
+            return new LocalSeekableInputStream(this.path);
+        }
+        catch (FileNotFoundException e) {
+            throw new IOException("Unable to find " + this.path, e);
+        }
+    }
+}

Added: incubator/pig/branches/plan/src/org/apache/pig/backend/local/datastorage/LocalPath.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/plan/src/org/apache/pig/backend/local/datastorage/LocalPath.java?rev=603773&view=auto
==============================================================================
--- incubator/pig/branches/plan/src/org/apache/pig/backend/local/datastorage/LocalPath.java (added)
+++ incubator/pig/branches/plan/src/org/apache/pig/backend/local/datastorage/LocalPath.java Wed Dec 12 15:10:51 2007
@@ -0,0 +1,155 @@
+package org.apache.pig.backend.local.datastorage;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.File;
+import java.util.Properties;
+
+import org.apache.pig.backend.datastorage.DataStorage;
+import org.apache.pig.backend.datastorage.DataStorageElementDescriptor;
+import org.apache.pig.backend.datastorage.SeekableInputStream;
+
+public abstract class LocalPath implements DataStorageElementDescriptor {
+
+    //Comparable<DataStorageElementDescriptor> {
+        
+    protected DataStorage fs;
+    protected File path;
+
+    protected File getCurPath() {
+    	File path;
+    	
+    	if (this.path.isAbsolute()) {
+    		path = this.path;
+    	}
+    	else {
+    		path = new File(fs.getActiveContainer().toString(),
+    						this.path.getPath());
+    	}
+    	
+    	return path;
+    }
+    
+    public LocalPath(LocalDataStorage fs, String path) {
+        this.fs = fs;
+        this.path = new File(path);
+    }
+    
+    public LocalPath(LocalDataStorage fs, File path) {
+        this.fs = fs;
+        this.path = new File(path.getPath());
+    }
+    
+    public LocalPath(LocalDataStorage fs, String parent, String child) {
+        this.fs = fs;
+        this.path = new File(parent, child);
+    }
+    
+    public LocalPath(LocalDataStorage fs, File parent, File child) {
+        this.fs = fs;
+        this.path = new File(parent.getPath(),
+        		             child.getPath());
+    }
+    
+    public LocalPath(LocalDataStorage fs, File parent, String child) {
+        this(fs, parent.getPath(), child);
+    }
+    
+    public LocalPath(LocalDataStorage fs, String parent, File child) {
+        this(fs, parent, child.getPath());
+    }
+    
+    @Override
+    public DataStorage getDataStorage() {
+        return fs;
+    }
+    
+    public File getPath() {
+        return this.path;
+    }
+
+    @Override
+    public abstract OutputStream create(Properties configuration) 
+            throws IOException;
+
+    @Override
+    public OutputStream create() 
+            throws IOException {
+        return create(null);
+    }
+
+    @Override
+    public abstract void copy(DataStorageElementDescriptor dstName,
+                              Properties dstConfiguration,
+                              boolean removeSrc) 
+            throws IOException;
+        
+    @Override
+    public void copy(DataStorageElementDescriptor dstName,
+                     boolean removeSrc) throws IOException {
+        copy(dstName, null, removeSrc);
+    }
+                
+    @Override
+    public abstract InputStream open() throws IOException;
+
+    @Override
+    public abstract SeekableInputStream sopen() throws IOException;
+        
+    @Override
+    public boolean exists() throws IOException {
+        return getCurPath().exists();
+    }
+    
+    @Override
+    public void rename(DataStorageElementDescriptor newName) 
+            throws IOException {
+        if (! this.path.renameTo(((LocalPath)newName).path)) {
+            throw new IOException("Unalbe to rename " + this.path +
+                                  "to " + ((LocalPath)newName).path);
+        }
+    }
+
+    @Override
+    public void delete() throws IOException {
+        getCurPath().delete();
+    }
+
+    @Override
+    public Properties getConfiguration() throws IOException {
+        Properties props = new Properties();
+        
+        props.put(BLOCK_REPLICATION_KEY, "1");
+        
+        return props;
+    }
+
+    @Override
+    public void updateConfiguration(Properties newConfig) 
+            throws IOException {
+        ;
+    }
+        
+    @Override
+    public Properties getStatistics() throws IOException {
+        Properties stats = new Properties();
+
+        long size = this.path.length();
+        stats.put(LENGTH_KEY , (new Long(size)).toString());
+
+        long lastModified = this.path.lastModified();
+        stats.put(MODIFICATION_TIME_KEY, (new Long(lastModified)).toString());
+        
+        return stats;
+    }
+
+    @Override
+    public int compareTo(DataStorageElementDescriptor other) {
+        return this.path.compareTo(((LocalPath)other).path);
+    }
+    
+    public String toString() {
+        return this.path.toString();
+    }
+}

Added: incubator/pig/branches/plan/src/org/apache/pig/backend/local/datastorage/LocalSeekableInputStream.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/plan/src/org/apache/pig/backend/local/datastorage/LocalSeekableInputStream.java?rev=603773&view=auto
==============================================================================
--- incubator/pig/branches/plan/src/org/apache/pig/backend/local/datastorage/LocalSeekableInputStream.java (added)
+++ incubator/pig/branches/plan/src/org/apache/pig/backend/local/datastorage/LocalSeekableInputStream.java Wed Dec 12 15:10:51 2007
@@ -0,0 +1,107 @@
+package org.apache.pig.backend.local.datastorage;
+
+import java.io.RandomAccessFile;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import org.apache.pig.backend.datastorage.*;
+
+public class LocalSeekableInputStream extends SeekableInputStream {
+
+    protected RandomAccessFile file;
+    protected long curMark;
+    
+    public LocalSeekableInputStream(File file) throws FileNotFoundException {
+        this.file = new RandomAccessFile(file, "r");
+        this.curMark = 0;
+    }
+    
+    @Override
+    public void seek(long offset, FLAGS whence) throws IOException {
+        long targetPos;
+        
+        switch (whence) {
+        case SEEK_SET: {
+            targetPos = offset;
+            break;
+        }
+        case SEEK_CUR: {
+            targetPos = this.file.getFilePointer() + offset;
+            break;
+        }
+        case SEEK_END: {
+            targetPos = this.file.length() + offset;
+            break;
+        }
+        default: {
+            throw new IOException("Invalid seek option: " + whence);
+        }
+        }
+        
+        this.file.seek(targetPos);
+    }
+    
+    @Override
+    public long tell() throws IOException {
+        return this.file.getFilePointer();
+    }
+    
+    @Override
+    public int read() throws IOException {
+        return this.file.read();
+    }
+    
+    @Override
+    public int read(byte[] b) throws IOException {
+        return this.file.read(b);
+    }
+        
+    @Override
+    public int read(byte[] b, int off, int len ) throws IOException {
+        return this.file.read(b, off, len);
+    }
+    
+    @Override
+    public int available() throws IOException {
+        throw new IOException("No information on available bytes");
+    }
+    
+    @Override
+    public long skip(long n) throws IOException {
+        long skipped = 0;
+        
+        if (n > 0) {
+            skipped = this.file.length() - tell();
+
+            seek(n, FLAGS.SEEK_CUR);
+        }
+        
+        return skipped;
+    }
+    
+    @Override
+    public void close() throws IOException {
+        this.file.close();
+    }
+    
+    @Override
+    public void mark(int readlimit) {
+        try {
+            this.curMark = tell();
+        }
+        catch (IOException e) {
+            ;
+        }
+    }
+    
+    @Override
+    public void reset() throws IOException {
+        seek(this.curMark, FLAGS.SEEK_SET);
+    }
+    
+    @Override
+    public boolean markSupported() {
+        return true;
+    }
+}