You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2007/12/13 00:10:53 UTC
svn commit: r603773 [1/2] - in /incubator/pig/branches/plan:
src/org/apache/pig/backend/ src/org/apache/pig/backend/datastorage/
src/org/apache/pig/backend/executionengine/
src/org/apache/pig/backend/hadoop/
src/org/apache/pig/backend/hadoop/datastorag...
Author: olga
Date: Wed Dec 12 15:10:51 2007
New Revision: 603773
URL: http://svn.apache.org/viewvc?rev=603773&view=rev
Log:
PIG-32: incremental changes to split logical and physical plan; second take
Added:
incubator/pig/branches/plan/src/org/apache/pig/backend/
incubator/pig/branches/plan/src/org/apache/pig/backend/datastorage/
incubator/pig/branches/plan/src/org/apache/pig/backend/datastorage/DataStorage.java
incubator/pig/branches/plan/src/org/apache/pig/backend/datastorage/DataStorageContainerDescriptor.java
incubator/pig/branches/plan/src/org/apache/pig/backend/datastorage/DataStorageElementDescriptor.java
incubator/pig/branches/plan/src/org/apache/pig/backend/datastorage/DataStorageException.java
incubator/pig/branches/plan/src/org/apache/pig/backend/datastorage/ImmutableOutputStream.java
incubator/pig/branches/plan/src/org/apache/pig/backend/datastorage/SeekableInputStream.java
incubator/pig/branches/plan/src/org/apache/pig/backend/executionengine/
incubator/pig/branches/plan/src/org/apache/pig/backend/executionengine/ExecutionEngine.java
incubator/pig/branches/plan/src/org/apache/pig/backend/executionengine/ExecutionEngineException.java
incubator/pig/branches/plan/src/org/apache/pig/backend/executionengine/ExecutionEngineLogicalOperator.java
incubator/pig/branches/plan/src/org/apache/pig/backend/executionengine/ExecutionEngineLogicalPlan.java
incubator/pig/branches/plan/src/org/apache/pig/backend/executionengine/ExecutionEngineNotificationEvent.java
incubator/pig/branches/plan/src/org/apache/pig/backend/executionengine/ExecutionEnginePhysicalPlan.java
incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/
incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/datastorage/
incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/datastorage/HConfiguration.java
incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java
incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/datastorage/HDirectory.java
incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/datastorage/HFile.java
incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/datastorage/HPath.java
incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/datastorage/HSeekableInputStream.java
incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/executionengine/
incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
incubator/pig/branches/plan/src/org/apache/pig/backend/local/
incubator/pig/branches/plan/src/org/apache/pig/backend/local/datastorage/
incubator/pig/branches/plan/src/org/apache/pig/backend/local/datastorage/LocalDataStorage.java
incubator/pig/branches/plan/src/org/apache/pig/backend/local/datastorage/LocalDir.java
incubator/pig/branches/plan/src/org/apache/pig/backend/local/datastorage/LocalFile.java
incubator/pig/branches/plan/src/org/apache/pig/backend/local/datastorage/LocalPath.java
incubator/pig/branches/plan/src/org/apache/pig/backend/local/datastorage/LocalSeekableInputStream.java
incubator/pig/branches/plan/test/org/apache/pig/test/TestAbstractionHadoopDataStorage.java
incubator/pig/branches/plan/test/org/apache/pig/test/TestAbstractionLocalDataStorage.java
Added: incubator/pig/branches/plan/src/org/apache/pig/backend/datastorage/DataStorage.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/plan/src/org/apache/pig/backend/datastorage/DataStorage.java?rev=603773&view=auto
==============================================================================
--- incubator/pig/branches/plan/src/org/apache/pig/backend/datastorage/DataStorage.java (added)
+++ incubator/pig/branches/plan/src/org/apache/pig/backend/datastorage/DataStorage.java Wed Dec 12 15:10:51 2007
@@ -0,0 +1,110 @@
+package org.apache.pig.backend.datastorage;
+
+import java.util.Properties;
+import java.io.IOException;
+
+public interface DataStorage {
+
+ public static final String DEFAULT_REPLICATION_FACTOR_KEY = "default.replication.factor";
+ public static final String USED_BYTES_KEY = "used.bytes";
+
+ //
+ // TODO: more keys
+ //
+
+ /**
+ * Place holder for possible initialization activities.
+ */
+ public void init();
+
+ /**
+ * Clean-up and releasing of resources.
+ */
+ public void close() throws IOException;
+
+ /**
+ * Provides configuration information about the storage itself.
+ * For instance global data-replication policies if any, default
+ * values, ... Some of such values could be overridden at a finer
+ * granularity (e.g. on a specific object in the Data Storage)
+ *
+ * @return - configuration information
+ */
+ public Properties getConfiguration();
+
+ /**
+ * Provides a way to change configuration parameters
+ * at the Data Storage level. For instance, change the
+ * data replication policy.
+ *
+ * @param newConfiguration - the new configuration settings
+ * @throws when configuration conflicts are detected
+ *
+ */
+ public void updateConfiguration(Properties newConfiguration)
+ throws DataStorageException;
+
+ /**
+ * Provides statistics on the Storage: capacity values, how much
+ * storage is in use...
+ * @return statistics on the Data Storage
+ */
+ public Properties getStatistics() throws IOException;
+
+ /**
+ * Creates an entity handle for an object (no containment
+ * relation) from a String
+ *
+ * @param name of the object
+ * @return an object descriptor
+ * @throws DataStorageException if name does not conform to naming
+ * convention enforced by the Data Storage.
+ */
+ public DataStorageElementDescriptor asElement(String name)
+ throws DataStorageException;
+
+ public DataStorageElementDescriptor asElement(DataStorageElementDescriptor element)
+ throws DataStorageException;
+
+ public DataStorageElementDescriptor asElement(String parent,
+ String child)
+ throws DataStorageException;
+
+ public DataStorageElementDescriptor asElement(DataStorageContainerDescriptor parent,
+ String child)
+ throws DataStorageException;
+
+ public DataStorageElementDescriptor asElement(DataStorageContainerDescriptor parent,
+ DataStorageElementDescriptor child)
+ throws DataStorageException;
+
+ /**
+ * Created an entity handle for a container.
+ *
+ * @param name of the container
+ * @return a container descriptor
+ * @throws DataStorageException if name does not conform to naming
+ * convention enforced by the Data Storage.
+ */
+ public DataStorageContainerDescriptor asContainer(String name)
+ throws DataStorageException;
+
+ public DataStorageContainerDescriptor asContainer(DataStorageContainerDescriptor container)
+ throws DataStorageException;
+
+ public DataStorageContainerDescriptor asContainer(String parent,
+ String child)
+ throws DataStorageException;
+
+ public DataStorageContainerDescriptor asContainer(DataStorageContainerDescriptor parent,
+ String child)
+ throws DataStorageException;
+
+ public DataStorageContainerDescriptor asContainer(DataStorageContainerDescriptor parent,
+ DataStorageContainerDescriptor child)
+ throws DataStorageException;
+
+ public void setActiveContainer(DataStorageContainerDescriptor container);
+
+ public DataStorageContainerDescriptor getActiveContainer();
+}
Added: incubator/pig/branches/plan/src/org/apache/pig/backend/datastorage/DataStorageContainerDescriptor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/plan/src/org/apache/pig/backend/datastorage/DataStorageContainerDescriptor.java?rev=603773&view=auto
==============================================================================
--- incubator/pig/branches/plan/src/org/apache/pig/backend/datastorage/DataStorageContainerDescriptor.java (added)
+++ incubator/pig/branches/plan/src/org/apache/pig/backend/datastorage/DataStorageContainerDescriptor.java Wed Dec 12 15:10:51 2007
@@ -0,0 +1,7 @@
+
+package org.apache.pig.backend.datastorage;
+
+public interface DataStorageContainerDescriptor
+ extends DataStorageElementDescriptor,
+ Iterable<DataStorageElementDescriptor> {
+}
Added: incubator/pig/branches/plan/src/org/apache/pig/backend/datastorage/DataStorageElementDescriptor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/plan/src/org/apache/pig/backend/datastorage/DataStorageElementDescriptor.java?rev=603773&view=auto
==============================================================================
--- incubator/pig/branches/plan/src/org/apache/pig/backend/datastorage/DataStorageElementDescriptor.java (added)
+++ incubator/pig/branches/plan/src/org/apache/pig/backend/datastorage/DataStorageElementDescriptor.java Wed Dec 12 15:10:51 2007
@@ -0,0 +1,120 @@
+package org.apache.pig.backend.datastorage;
+
+import java.io.OutputStream;
+import java.io.InputStream;
+import java.io.IOException;
+
+import java.util.Properties;
+
+public interface DataStorageElementDescriptor extends
+ Comparable<DataStorageElementDescriptor> {
+
+ public static final String BLOCK_SIZE_KEY = "path.block.size";
+ public static final String BLOCK_REPLICATION_KEY = "path.block.replication";
+ public static final String LENGTH_KEY = "path.length";
+ public static final String MODIFICATION_TIME_KEY = "path.modification.time";
+
+ //
+ // TODO: more keys
+ //
+
+ public DataStorage getDataStorage();
+
+ /**
+ * Opens a stream onto which an entity can be written to.
+ *
+ * @param configuration information at the object level
+ * @return stream where to write
+ * @throws DataStorageException
+ */
+ public OutputStream create(Properties configuration)
+ throws IOException;
+
+ public OutputStream create()
+ throws IOException;
+
+ /**
+ * Copy entity from an existing one, possibly residing in a
+ * different Data Storage.
+ *
+ * @param dstName name of entity to create
+ * @param dstConfiguration configuration for the new entity
+ * @param removeSrc if src entity needs to be removed after copying it
+ * @throws DataStorageException for instance, configuration
+ * information for new entity is not compatible with
+ * configuration information at the Data
+ * Storage level, user does not have privileges to read from
+ * source entity or write to destination storage...
+ */
+ public void copy(DataStorageElementDescriptor dstName,
+ Properties dstConfiguration,
+ boolean removeSrc)
+ throws IOException;
+
+ public void copy(DataStorageElementDescriptor dstName,
+ boolean removeSrc)
+ throws IOException;
+
+ /**
+ * Open for read a given entity
+ *
+ * @return entity to read from
+ * @throws DataStorageExecption e.g. entity does not exist...
+ */
+ public InputStream open() throws IOException;
+
+ /**
+ * Open an element in the Data Storage with support for random access
+ * (seek operations).
+ *
+ * @return a seekable input stream
+ * @throws DataStorageException
+ */
+ public SeekableInputStream sopen()
+ throws IOException;
+
+ /**
+ * Checks whether the entity exists or not
+ *
+ * @param name of entity
+ * @return true if entity exists, false otherwise.
+ */
+ public boolean exists() throws IOException;
+
+ /**
+ * Changes the name of an entity in the Data Storage
+ *
+ * @param newName new name of entity
+ * @throws DataStorageException
+ */
+ public void rename(DataStorageElementDescriptor newName)
+ throws IOException;
+
+ /**
+ * Remove entity from the Data Storage.
+ *
+ * @throws DataStorageException
+ */
+ public void delete() throws IOException;
+
+ /**
+ * Retrieve configuration information for entity
+ * @return configuration
+ */
+ public Properties getConfiguration() throws IOException;
+
+ /**
+ * Update configuration information for this entity
+ *
+ * @param newConfig configuration
+ * @throws DataStorageException
+ */
+ public void updateConfiguration(Properties newConfig)
+ throws IOException;
+
+ /**
+ * List entity statistics
+ * @return DataStorageProperties
+ */
+ public Properties getStatistics() throws IOException;
+}
Added: incubator/pig/branches/plan/src/org/apache/pig/backend/datastorage/DataStorageException.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/plan/src/org/apache/pig/backend/datastorage/DataStorageException.java?rev=603773&view=auto
==============================================================================
--- incubator/pig/branches/plan/src/org/apache/pig/backend/datastorage/DataStorageException.java (added)
+++ incubator/pig/branches/plan/src/org/apache/pig/backend/datastorage/DataStorageException.java Wed Dec 12 15:10:51 2007
@@ -0,0 +1,22 @@
+package org.apache.pig.backend.datastorage;
+
+public class DataStorageException extends Throwable {
+
+ static final long serialVersionUID = 1;
+
+ public DataStorageException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public DataStorageException() {
+ this(null, null);
+ }
+
+ public DataStorageException(String message) {
+ this(message, null);
+ }
+
+ public DataStorageException(Throwable cause) {
+ this(null, cause);
+ }
+}
Added: incubator/pig/branches/plan/src/org/apache/pig/backend/datastorage/ImmutableOutputStream.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/plan/src/org/apache/pig/backend/datastorage/ImmutableOutputStream.java?rev=603773&view=auto
==============================================================================
--- incubator/pig/branches/plan/src/org/apache/pig/backend/datastorage/ImmutableOutputStream.java (added)
+++ incubator/pig/branches/plan/src/org/apache/pig/backend/datastorage/ImmutableOutputStream.java Wed Dec 12 15:10:51 2007
@@ -0,0 +1,18 @@
+package org.apache.pig.backend.datastorage;
+
+import java.io.OutputStream;
+import java.io.IOException;
+
+public class ImmutableOutputStream extends OutputStream {
+
+ private String destination;
+
+ public ImmutableOutputStream(String destination) {
+ super();
+ this.destination = destination;
+ }
+
+ public void write(int b) throws IOException {
+ throw new IOException("Write not supported on " + destination);
+ }
+}
Added: incubator/pig/branches/plan/src/org/apache/pig/backend/datastorage/SeekableInputStream.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/plan/src/org/apache/pig/backend/datastorage/SeekableInputStream.java?rev=603773&view=auto
==============================================================================
--- incubator/pig/branches/plan/src/org/apache/pig/backend/datastorage/SeekableInputStream.java (added)
+++ incubator/pig/branches/plan/src/org/apache/pig/backend/datastorage/SeekableInputStream.java Wed Dec 12 15:10:51 2007
@@ -0,0 +1,33 @@
+package org.apache.pig.backend.datastorage;
+
+import java.io.InputStream;
+import java.io.IOException;
+
+public abstract class SeekableInputStream extends InputStream {
+
+ public enum FLAGS {
+ SEEK_SET,
+ SEEK_CUR,
+ SEEK_END,
+ }
+
+ /**
+ * Seeks to a given offset as specified by whence flags.
+ * If whence is SEEK_SET, offset is added to beginning of stream
+ * If whence is SEEK_CUR, offset is added to current position inside stream
+ * If whence is SEEK_END, offset is added to end of file position
+ *
+ * @param offset
+ * @param whence
+ * @throws IOException
+ */
+ public abstract void seek(long offset, FLAGS whence) throws IOException;
+
+ /**
+ * Returns current offset
+ *
+ * @return offset
+ * @throws IOException
+ */
+ public abstract long tell() throws IOException;
+}
Added: incubator/pig/branches/plan/src/org/apache/pig/backend/executionengine/ExecutionEngine.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/plan/src/org/apache/pig/backend/executionengine/ExecutionEngine.java?rev=603773&view=auto
==============================================================================
--- incubator/pig/branches/plan/src/org/apache/pig/backend/executionengine/ExecutionEngine.java (added)
+++ incubator/pig/branches/plan/src/org/apache/pig/backend/executionengine/ExecutionEngine.java Wed Dec 12 15:10:51 2007
@@ -0,0 +1,76 @@
+package org.apache.pig.backend.executionengine;
+
+import java.util.Collection;
+import java.util.Properties;
+
+/**
+ * This is the main interface that various execution engines
+ * need to support and it is also the main interface that Pig
+ * will need to use to submit jobs for execution, retrieve information
+ * about their progress and possibly terminate them.
+ *
+ */
+public interface ExecutionEngine {
+ /**
+ * Place holder for possible initialization activities.
+ */
+ public void init();
+
+ /**
+ * Clean-up and releasing of resources.
+ */
+ public void close();
+
+
+ /**
+ * Provides configuration information about the execution engine itself.
+ *
+ * @return - information about the configuration used to connect to execution engine
+ */
+ public Properties getConfiguration();
+
+ /**
+ * Provides a way to dynamically change configuration parameters
+ * at the Execution Engine level.
+ *
+ * @param newConfiguration - the new configuration settings
+ * @throws when configuration conflicts are detected
+ *
+ */
+ public void updateConfiguration(Properties newConfiguration)
+ throws ExecutionEngineException;
+
+ /**
+ * Provides statistics on the Execution Engine: number of nodes,
+ * node failure rates, average load, average job wait time...
+ * @return ExecutionEngineProperties
+ */
+ public Properties getStatistics() throws ExecutionEngineException;
+
+ /**
+ * Compiles a logical plan into a physical plan, given a set of configuration
+ * properties that apply at the plan-level. For instance desired degree of
+ * parallelism for this plan, which could be different from the "default"
+ * one set at the execution engine level.
+ *
+ * @param logical plan
+ * @param properties
+ * @return physical plan
+ */
+ public ExecutionEnginePhysicalPlan compile(ExecutionEngineLogicalPlan plan,
+ Properties properties)
+ throws ExecutionEngineException;
+
+ /**
+ * This may be useful to support admin functionalities.
+ *
+ * @return a collection of jobs "known" to the execution engine,
+ * say jobs currently queued up or running (this can be determined
+ * by the obtaining the properties of the job)
+ *
+ * @throws ExecutionEngineException maybe the user does not have privileges
+ * to obtain this information...
+ */
+ public Collection<ExecutionEnginePhysicalPlan> physicalPlans ()
+ throws ExecutionEngineException;
+}
Added: incubator/pig/branches/plan/src/org/apache/pig/backend/executionengine/ExecutionEngineException.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/plan/src/org/apache/pig/backend/executionengine/ExecutionEngineException.java?rev=603773&view=auto
==============================================================================
--- incubator/pig/branches/plan/src/org/apache/pig/backend/executionengine/ExecutionEngineException.java (added)
+++ incubator/pig/branches/plan/src/org/apache/pig/backend/executionengine/ExecutionEngineException.java Wed Dec 12 15:10:51 2007
@@ -0,0 +1,21 @@
+package org.apache.pig.backend.executionengine;
+
+public class ExecutionEngineException extends Throwable {
+ static final long serialVersionUID = 1;
+
+ public ExecutionEngineException (String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public ExecutionEngineException() {
+ this(null, null);
+ }
+
+ public ExecutionEngineException(String message) {
+ this(message, null);
+ }
+
+ public ExecutionEngineException(Throwable cause) {
+ this(null, cause);
+ }
+}
Added: incubator/pig/branches/plan/src/org/apache/pig/backend/executionengine/ExecutionEngineLogicalOperator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/plan/src/org/apache/pig/backend/executionengine/ExecutionEngineLogicalOperator.java?rev=603773&view=auto
==============================================================================
--- incubator/pig/branches/plan/src/org/apache/pig/backend/executionengine/ExecutionEngineLogicalOperator.java (added)
+++ incubator/pig/branches/plan/src/org/apache/pig/backend/executionengine/ExecutionEngineLogicalOperator.java Wed Dec 12 15:10:51 2007
@@ -0,0 +1,12 @@
+package org.apache.pig.backend.executionengine;
+
+import java.io.Serializable;
+
+public interface ExecutionEngineLogicalOperator extends Serializable {
+
+ // catenation of group and name is a GUId for the operator
+ public String getScope();
+ public long getId();
+
+ public ExecutionEngineLogicalOperator childAt(int position);
+}
Added: incubator/pig/branches/plan/src/org/apache/pig/backend/executionengine/ExecutionEngineLogicalPlan.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/plan/src/org/apache/pig/backend/executionengine/ExecutionEngineLogicalPlan.java?rev=603773&view=auto
==============================================================================
--- incubator/pig/branches/plan/src/org/apache/pig/backend/executionengine/ExecutionEngineLogicalPlan.java (added)
+++ incubator/pig/branches/plan/src/org/apache/pig/backend/executionengine/ExecutionEngineLogicalPlan.java Wed Dec 12 15:10:51 2007
@@ -0,0 +1,5 @@
+package org.apache.pig.backend.executionengine;
+
+public interface ExecutionEngineLogicalPlan {
+
+}
Added: incubator/pig/branches/plan/src/org/apache/pig/backend/executionengine/ExecutionEngineNotificationEvent.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/plan/src/org/apache/pig/backend/executionengine/ExecutionEngineNotificationEvent.java?rev=603773&view=auto
==============================================================================
--- incubator/pig/branches/plan/src/org/apache/pig/backend/executionengine/ExecutionEngineNotificationEvent.java (added)
+++ incubator/pig/branches/plan/src/org/apache/pig/backend/executionengine/ExecutionEngineNotificationEvent.java Wed Dec 12 15:10:51 2007
@@ -0,0 +1,5 @@
+package org.apache.pig.backend.executionengine;
+
+public interface ExecutionEngineNotificationEvent {
+
+}
Added: incubator/pig/branches/plan/src/org/apache/pig/backend/executionengine/ExecutionEnginePhysicalPlan.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/plan/src/org/apache/pig/backend/executionengine/ExecutionEnginePhysicalPlan.java?rev=603773&view=auto
==============================================================================
--- incubator/pig/branches/plan/src/org/apache/pig/backend/executionengine/ExecutionEnginePhysicalPlan.java (added)
+++ incubator/pig/branches/plan/src/org/apache/pig/backend/executionengine/ExecutionEnginePhysicalPlan.java Wed Dec 12 15:10:51 2007
@@ -0,0 +1,96 @@
+package org.apache.pig.backend.executionengine;
+
+import java.util.Properties;
+import java.util.Iterator;
+
+import org.apache.pig.data.Tuple;
+
+public interface ExecutionEnginePhysicalPlan {
+
+ /*
+ * As the fron end is not really supposed to do much with the physical representation
+ * the handle to the physical plan can just be an GUId in the back-end state, which
+ * brings up the problem is persistency...
+ */
+
+
+ /**
+ * Execute the physical plan.
+ * This is non-blocking. See getStatistics to pull information
+ * about the job.
+ *
+ * @throws
+ */
+ public void execute() throws ExecutionEngineException;
+
+ public void submit() throws ExecutionEngineException;
+
+ /**
+ * A job may have properties, like a priority, degree of parallelism...
+ * Some of such properties may be inherited from the ExecutionEngine
+ * configuration, other may have been set specifically for this job.
+ * For instance, a job scheduler may attribute low priority to
+ * jobs automatically started for maintenance purpose.
+ *
+ * @return set of properties
+ */
+ public Properties getConfiguration();
+
+ /**
+ * true is the physical plan has executed successfully and results are ready
+ * to be retireved
+ *
+ * @return
+ * @throws ExecutionEngineException
+ */
+ public boolean hasExecuted() throws ExecutionEngineException;
+
+ /**
+ * if query has executed successfully we want to retireve the results
+ * via iterating over them.
+ *
+ * @return
+ * @throws ExecutionEngineException
+ */
+ public Iterator<Tuple> getResults() throws ExecutionEngineException;
+
+ /**
+ * Some properties of the job may be changed, say the priority...
+ *
+ * @param configuration
+ * @throws some changes may not be allowed, for instance the some
+ * job-level properties cannot override Execution-Engine-level properties
+ * or maybe some properties can only be changes only in certain states of the
+ * job, say, once the job is started, parallelism level may not be changed...
+ */
+ public void updateConfiguration(Properties configuration)
+ throws ExecutionEngineException;
+
+ /**
+ * Hook to provide asynchronous notifications.
+ *
+ */
+ public void notify(ExecutionEngineNotificationEvent event);
+
+ /**
+ * Kills current job.
+ *
+ * @throws ExecutionEngineException
+ */
+ public void kill() throws ExecutionEngineException;
+
+ /**
+ * Can be information about the state (not submitted, e.g. the execute method
+ * has not been called yet; not running, e.g. execute has been issued,
+ * but job is waiting; running...; completed; aborted...; progress information
+ *
+ * @return
+ */
+ public Properties getStatistics();
+
+ /**
+ * To provide an "explanation" about how the physical plan has been constructed
+ *
+ */
+ public void explain();
+}
Added: incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/datastorage/HConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/datastorage/HConfiguration.java?rev=603773&view=auto
==============================================================================
--- incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/datastorage/HConfiguration.java (added)
+++ incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/datastorage/HConfiguration.java Wed Dec 12 15:10:51 2007
@@ -0,0 +1,52 @@
+package org.apache.pig.backend.hadoop.datastorage;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Enumeration;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.Properties;
+
+
+public class HConfiguration extends Properties {
+
+ private static final long serialVersionUID = 1L;
+
+ public HConfiguration() {
+ }
+
+ //
+ // TODO: this implementation has a problem: it does not
+ // respect final attributes
+ //
+ public HConfiguration(Configuration other) {
+ if (other != null) {
+ Iterator<Map.Entry<String, String>> iter = other.iterator();
+
+ while (iter.hasNext()) {
+ Map.Entry<String, String> entry = iter.next();
+
+ put(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+
+ public Configuration getConfiguration() {
+ Configuration config = new Configuration();
+
+ Enumeration<Object> iter = keys();
+
+ while (iter.hasMoreElements()) {
+ String key = (String) iter.nextElement();
+ String val = getProperty(key);
+
+ config.set(key, val);
+ }
+
+ return config;
+ }
+}
+
+
+
Added: incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java?rev=603773&view=auto
==============================================================================
--- incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java (added)
+++ incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java Wed Dec 12 15:10:51 2007
@@ -0,0 +1,162 @@
+package org.apache.pig.backend.hadoop.datastorage;
+
+import java.net.URI;
+import java.io.IOException;
+import java.util.Properties;
+import java.util.Enumeration;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.pig.backend.datastorage.*;
+
+
+public class HDataStorage implements DataStorage {
+
+ private FileSystem fs;
+
+ public HDataStorage(URI uri, Configuration conf) throws IOException {
+ this(uri, new HConfiguration(conf));
+ }
+
+ public HDataStorage(URI uri, HConfiguration conf) throws IOException {
+ fs = FileSystem.get(uri, conf.getConfiguration());
+ }
+
+ public HDataStorage(Configuration conf) throws IOException {
+ this(new HConfiguration(conf));
+ }
+
+ public HDataStorage(HConfiguration conf) throws IOException {
+ fs = FileSystem.get(conf.getConfiguration());
+ }
+
+ @Override
+ public void init() { }
+
+ @Override
+ public void close() throws IOException {
+ fs.close();
+ }
+
+ @Override
+ public Properties getConfiguration() {
+ Properties props = new HConfiguration(fs.getConf());
+
+ short defaultReplication = fs.getDefaultReplication();
+ props.setProperty(DEFAULT_REPLICATION_FACTOR_KEY,
+ (new Short(defaultReplication)).toString());
+
+ return props;
+ }
+
+ @Override
+ public void updateConfiguration(Properties newConfiguration)
+ throws DataStorageException {
+ if (newConfiguration == null) {
+ return;
+ }
+
+ Enumeration<Object> newKeys = newConfiguration.keys();
+
+ while (newKeys.hasMoreElements()) {
+ String key = (String) newKeys.nextElement();
+ String value = null;
+
+ value = newConfiguration.getProperty(key);
+
+ fs.getConf().set(key,value);
+ }
+ }
+
+ @Override
+ public Properties getStatistics() throws IOException {
+ Properties stats = new Properties();
+ long bytes = fs.getUsed();
+
+ stats.setProperty(USED_BYTES_KEY , new Long(bytes).toString());
+
+ return stats;
+ }
+
+ @Override
+ public DataStorageElementDescriptor asElement(String name)
+ throws DataStorageException {
+ return new HFile(this, name);
+ }
+
+ @Override
+ public DataStorageElementDescriptor asElement(DataStorageElementDescriptor element)
+ throws DataStorageException {
+ return new HFile(this, element.toString());
+ }
+
+ @Override
+ public DataStorageElementDescriptor asElement(String parent,
+ String child)
+ throws DataStorageException {
+ return new HFile(this, parent, child);
+ }
+
+ @Override
+ public DataStorageElementDescriptor asElement(DataStorageContainerDescriptor parent,
+ String child)
+ throws DataStorageException {
+ return new HFile(this, parent.toString(), child);
+ }
+
+ @Override
+ public DataStorageElementDescriptor asElement(DataStorageContainerDescriptor parent,
+ DataStorageElementDescriptor child)
+ throws DataStorageException {
+ return new HFile(this, parent.toString(), child.toString());
+ }
+
+ @Override
+ public DataStorageContainerDescriptor asContainer(String name)
+ throws DataStorageException {
+ return new HDirectory(this, name);
+ }
+
+ @Override
+ public DataStorageContainerDescriptor asContainer(DataStorageContainerDescriptor container)
+ throws DataStorageException {
+ return new HDirectory(this, container.toString());
+ }
+
+ @Override
+ public DataStorageContainerDescriptor asContainer(String parent,
+ String child)
+ throws DataStorageException {
+ return new HDirectory(this, parent, child);
+ }
+
+ @Override
+ public DataStorageContainerDescriptor asContainer(DataStorageContainerDescriptor parent,
+ String child)
+ throws DataStorageException {
+ return new HDirectory(this, parent.toString(), child);
+ }
+
+ @Override
+ public DataStorageContainerDescriptor asContainer(DataStorageContainerDescriptor parent,
+ DataStorageContainerDescriptor child)
+ throws DataStorageException {
+ return new HDirectory(this, parent.toString(), child.toString());
+ }
+
+ @Override
+ public void setActiveContainer(DataStorageContainerDescriptor container) {
+ fs.setWorkingDirectory(new Path(container.toString()));
+ }
+
+ @Override
+ public DataStorageContainerDescriptor getActiveContainer() {
+ return new HDirectory(this, fs.getWorkingDirectory());
+ }
+
+ public FileSystem getHFS() {
+ return fs;
+ }
+}
Added: incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/datastorage/HDirectory.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/datastorage/HDirectory.java?rev=603773&view=auto
==============================================================================
--- incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/datastorage/HDirectory.java (added)
+++ incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/datastorage/HDirectory.java Wed Dec 12 15:10:51 2007
@@ -0,0 +1,148 @@
+package org.apache.pig.backend.hadoop.datastorage;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Properties;
+
+import org.apache.hadoop.fs.Path;
+
+import org.apache.pig.backend.datastorage.*;
+
+public class HDirectory extends HPath
+ implements DataStorageContainerDescriptor {
+
+ public HDirectory(HDataStorage fs, Path parent, Path child) {
+ super(fs, parent, child);
+ }
+
+ public HDirectory(HDataStorage fs, String parent, String child) {
+ super(fs, parent, child);
+ }
+
+ public HDirectory(HDataStorage fs, Path parent, String child) {
+ super(fs, parent, child);
+ }
+
+ public HDirectory(HDataStorage fs, String parent, Path child) {
+ super(fs, parent, child);
+ }
+
+ public HDirectory(HDataStorage fs, Path path) {
+ super(fs, path);
+ }
+
+ public HDirectory(HDataStorage fs, String pathString) {
+ super(fs, pathString);
+ }
+
+ @Override
+ public OutputStream create(Properties configuration)
+ throws IOException {
+ fs.getHFS().mkdirs(path);
+
+ return new ImmutableOutputStream(path.toString());
+ }
+
+ @Override
+ public void copy(DataStorageElementDescriptor dstName,
+ Properties dstConfiguration,
+ boolean removeSrc)
+ throws IOException {
+ copy((DataStorageContainerDescriptor) dstName,
+ dstConfiguration,
+ removeSrc);
+ }
+
+
+ public void copy(DataStorageContainerDescriptor dstName,
+ Properties dstConfiguration,
+ boolean removeSrc)
+ throws IOException {
+ if (dstName == null) {
+ return;
+ }
+
+ if (!exists()) {
+ throw new IOException("Source does not exist " +
+ this);
+ }
+
+ if (dstName.exists()) {
+ throw new IOException("Destination already exists " +
+ dstName);
+ }
+
+ dstName.create();
+
+ Iterator<DataStorageElementDescriptor> elems = iterator();
+
+ try {
+ while (elems.hasNext()) {
+ DataStorageElementDescriptor curElem = elems.next();
+
+ if (curElem instanceof DataStorageContainerDescriptor) {
+ DataStorageContainerDescriptor dst =
+ dstName.getDataStorage().asContainer(dstName,
+ ((HPath)curElem).getPath().getName());
+
+ curElem.copy(dst, dstConfiguration, removeSrc);
+
+ if (removeSrc) {
+ curElem.delete();
+ }
+ }
+ else {
+ DataStorageElementDescriptor dst =
+ dstName.getDataStorage().asElement(dstName,
+ ((HPath)curElem).getPath().getName());
+
+ curElem.copy(dst, dstConfiguration, removeSrc);
+ }
+ }
+ }
+ catch (DataStorageException e) {
+ throw new IOException("Failed to copy " + this + " to " + dstName, e);
+ }
+
+ if (removeSrc) {
+ delete();
+ }
+ }
+
+ @Override
+ public InputStream open() throws IOException {
+ throw new IOException("Cannot open dir " + path);
+ }
+
+ @Override
+ public SeekableInputStream sopen() throws IOException {
+ throw new IOException("Cannot sopen dir " + path);
+ }
+
+ public Iterator<DataStorageElementDescriptor> iterator() {
+ LinkedList<DataStorageElementDescriptor> elements =
+ new LinkedList<DataStorageElementDescriptor>();
+
+ try {
+ Path[] paths = fs.getHFS().listPaths(new Path[]{path});
+
+ for (Path p : paths) {
+ if (fs.getHFS().isFile(p)) {
+ elements.add(fs.asElement(p.toString()));
+ }
+ else {
+ elements.add(fs.asContainer(p.toString()));
+ }
+ }
+ }
+ catch (IOException e) {
+ }
+ catch (DataStorageException e) {
+ }
+
+ return elements.iterator();
+ }
+}
Added: incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/datastorage/HFile.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/datastorage/HFile.java?rev=603773&view=auto
==============================================================================
--- incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/datastorage/HFile.java (added)
+++ incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/datastorage/HFile.java Wed Dec 12 15:10:51 2007
@@ -0,0 +1,105 @@
+package org.apache.pig.backend.hadoop.datastorage;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Properties;
+
+import org.apache.hadoop.fs.Path;
+
+import org.apache.pig.backend.datastorage.DataStorageElementDescriptor;
+import org.apache.pig.backend.datastorage.DataStorageContainerDescriptor;
+import org.apache.pig.backend.datastorage.DataStorageException;
+import org.apache.pig.backend.datastorage.SeekableInputStream;
+
+public class HFile extends HPath {
+
+ public HFile(HDataStorage fs, Path parent, Path child) {
+ super(fs, parent, child);
+ }
+
+ public HFile(HDataStorage fs, String parent, String child) {
+ super(fs, parent, child);
+ }
+
+ public HFile(HDataStorage fs, Path parent, String child) {
+ super(fs, parent, child);
+ }
+
+ public HFile(HDataStorage fs, String parent, Path child) {
+ super(fs, parent, child);
+ }
+
+ public HFile(HDataStorage fs, String pathString) {
+ super(fs, pathString);
+ }
+
+ public HFile(HDataStorage fs, Path path) {
+ super(fs, path);
+ }
+
+ @Override
+ public OutputStream create(Properties configuration)
+ throws IOException {
+ return fs.getHFS().create(path, false);
+ }
+
+ @Override
+ public void copy(DataStorageElementDescriptor dstName,
+ Properties dstConfiguration,
+ boolean removeSrc)
+ throws IOException {
+ if (dstName == null) {
+ return;
+ }
+
+ if (!exists()) {
+ throw new IOException("Source does not exist " +
+ this);
+ }
+
+ if (dstName.exists()) {
+ if (dstName instanceof DataStorageContainerDescriptor) {
+ try {
+ dstName = dstName.getDataStorage().
+ asElement((DataStorageContainerDescriptor) dstName,
+ getPath().getName());
+ }
+ catch (DataStorageException e) {
+ throw new IOException("Unable to generate element name (src: " +
+ this + ", dst: " + dstName + ")",
+ e);
+ }
+ }
+ }
+
+ InputStream in = null;
+ OutputStream out = null;
+
+ in = this.open();
+ out = dstName.create(dstConfiguration);
+
+ byte[] data = new byte[4 * 1024];
+ int bc;
+ while((bc = in.read(data)) != -1) {
+ out.write(data, 0, bc);
+ }
+
+ out.close();
+
+ if (removeSrc) {
+ delete();
+ }
+ }
+
+ @Override
+ public InputStream open() throws IOException {
+ return fs.getHFS().open(path);
+ }
+
+ @Override
+ public SeekableInputStream sopen() throws IOException {
+ return new HSeekableInputStream(fs.getHFS().open(path),
+ fs.getHFS().getContentLength(path));
+ }
+}
Added: incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/datastorage/HPath.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/datastorage/HPath.java?rev=603773&view=auto
==============================================================================
--- incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/datastorage/HPath.java (added)
+++ incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/datastorage/HPath.java Wed Dec 12 15:10:51 2007
@@ -0,0 +1,153 @@
+
+package org.apache.pig.backend.hadoop.datastorage;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+
+import org.apache.pig.backend.datastorage.*;
+
+public abstract class HPath implements DataStorageElementDescriptor {
+
+ protected Path path;
+ protected HDataStorage fs;
+
+ public HPath(HDataStorage fs, Path parent, Path child) {
+ this.path = new Path(parent, child);
+ this.fs = fs;
+ }
+
+ public HPath(HDataStorage fs, String parent, String child) {
+ this(fs, new Path(parent), new Path(child));
+ }
+
+ public HPath(HDataStorage fs, Path parent, String child) {
+ this(fs, parent, new Path(child));
+ }
+
+ public HPath(HDataStorage fs, String parent, Path child) {
+ this(fs, new Path(parent), child);
+ }
+
+ public HPath(HDataStorage fs, String pathString) {
+ this(fs, new Path(pathString));
+ }
+
+ public HPath(HDataStorage fs, Path path) {
+ this.path = path;
+ this.fs = fs;
+ }
+
+ @Override
+ public DataStorage getDataStorage() {
+ return fs;
+ }
+
+ @Override
+ public abstract OutputStream create(Properties configuration)
+ throws IOException;
+
+ @Override
+ public abstract void copy(DataStorageElementDescriptor dstName,
+ Properties dstConfiguration,
+ boolean removeSrc)
+ throws IOException;
+
+ @Override
+ public abstract InputStream open() throws IOException;
+
+ @Override
+ public abstract SeekableInputStream sopen() throws IOException;
+
+ @Override
+ public boolean exists() throws IOException {
+ return fs.getHFS().exists(path);
+ }
+
+ @Override
+ public void rename(DataStorageElementDescriptor newName)
+ throws IOException {
+ if (newName != null) {
+ fs.getHFS().rename(path, ((HPath)newName).path);
+ }
+ }
+
+ @Override
+ public void delete() throws IOException {
+ fs.getHFS().delete(path);
+ }
+
+ @Override
+ public Properties getConfiguration() throws IOException {
+ HConfiguration props = new HConfiguration();
+
+ long blockSize = fs.getHFS().getFileStatus(path).getBlockSize();
+
+ short replication = fs.getHFS().getFileStatus(path).getReplication();
+
+ props.setProperty(BLOCK_SIZE_KEY, (new Long(blockSize)).toString());
+ props.setProperty(BLOCK_REPLICATION_KEY, (new Short(replication)).toString());
+
+ return props;
+ }
+
+ @Override
+ public void updateConfiguration(Properties newConfig) throws IOException {
+ if (newConfig == null) {
+ return;
+ }
+
+ String blkReplStr = newConfig.getProperty(BLOCK_REPLICATION_KEY);
+
+ fs.getHFS().setReplication(path,
+ new Short(blkReplStr).shortValue());
+ }
+
+ @Override
+ public Properties getStatistics() throws IOException {
+ HConfiguration props = new HConfiguration();
+
+ Long length = new Long(fs.getHFS().getFileStatus(path).getLen());
+
+ Long modificationTime = new Long(fs.getHFS().getFileStatus(path).
+ getModificationTime());
+
+ props.setProperty(LENGTH_KEY, length.toString());
+ props.setProperty(MODIFICATION_TIME_KEY, modificationTime.toString());
+
+ return props;
+ }
+
+ @Override
+ public int compareTo(DataStorageElementDescriptor other) {
+ return path.compareTo(other);
+ }
+
+ @Override
+ public OutputStream create() throws IOException {
+ return create(null);
+ }
+
+ @Override
+ public void copy(DataStorageElementDescriptor dstName,
+ boolean removeSrc)
+ throws IOException {
+ copy(dstName, null, removeSrc);
+ }
+
+ public Path getPath() {
+ return path;
+ }
+
+ public FileSystem getHFS() {
+ return fs.getHFS();
+ }
+
+ public String toString() {
+ return path.toString();
+ }
+}
Added: incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/datastorage/HSeekableInputStream.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/datastorage/HSeekableInputStream.java?rev=603773&view=auto
==============================================================================
--- incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/datastorage/HSeekableInputStream.java (added)
+++ incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/datastorage/HSeekableInputStream.java Wed Dec 12 15:10:51 2007
@@ -0,0 +1,94 @@
+package org.apache.pig.backend.hadoop.datastorage;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+
+import org.apache.pig.backend.datastorage.SeekableInputStream;
+
+public class HSeekableInputStream extends SeekableInputStream {
+
+ protected FSDataInputStream input;
+ protected long contentLength;
+
+ HSeekableInputStream(FSDataInputStream input,
+ long contentLength) {
+ this.input = input;
+ this.contentLength = contentLength;
+ }
+
+ @Override
+ public void seek(long offset, FLAGS whence) throws IOException {
+ long targetPos;
+
+ switch (whence) {
+ case SEEK_SET: {
+ targetPos = offset;
+ break;
+ }
+ case SEEK_CUR: {
+ targetPos = input.getPos() + offset;
+ break;
+ }
+ case SEEK_END: {
+ targetPos = contentLength + offset;
+ break;
+ }
+ default: {
+ throw new IOException("Invalid seek option: " + whence);
+ }
+ }
+
+ input.seek(targetPos);
+ }
+
+ @Override
+ public long tell() throws IOException {
+ return input.getPos();
+ }
+
+ @Override
+ public int read() throws IOException {
+ return input.read();
+ }
+
+ @Override
+ public int read(byte[] b) throws IOException {
+ return input.read(b);
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len ) throws IOException {
+ return input.read(b, off, len);
+ }
+
+ @Override
+ public int available() throws IOException {
+ return input.available();
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ return input.skip(n);
+ }
+
+ @Override
+ public void close() throws IOException {
+ input.close();
+ }
+
+ @Override
+ public void mark(int readlimit) {
+ input.mark(readlimit);
+ }
+
+ @Override
+ public void reset() throws IOException {
+ input.reset();
+ }
+
+ @Override
+ public boolean markSupported() {
+ return input.markSupported();
+ }
+}
Added: incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=603773&view=auto
==============================================================================
--- incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (added)
+++ incubator/pig/branches/plan/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Wed Dec 12 15:10:51 2007
@@ -0,0 +1,68 @@
+package org.apache.pig.backend.hadoop.executionengine;
+
+import java.util.Collection;
+import java.util.Properties;
+import java.util.Map;
+import java.util.HashMap;
+
+import org.apache.pig.backend.executionengine.ExecutionEngine;
+import org.apache.pig.backend.executionengine.ExecutionEngineException;
+import org.apache.pig.backend.executionengine.ExecutionEngineLogicalPlan;
+import org.apache.pig.backend.executionengine.ExecutionEnginePhysicalPlan;
+
+import org.apache.pig.impl.physicalLayer.IntermedResult;
+
+public class HExecutionEngine implements ExecutionEngine {
+
+ protected Map<String, IntermedResult> aliases;
+
+ public HExecutionEngine() {
+ aliases = new HashMap<String, IntermedResult>();
+ }
+
+ public void init() {
+ // nothing to do
+ }
+
+ public void close() {
+ // TODO:
+ // check for running jobs and kill them all?
+ }
+
+ public Properties getConfiguration() {
+ // TODO
+ return null;
+ }
+
+ public void updateConfiguration(Properties newConfiguration)
+ throws ExecutionEngineException {
+ // TODO
+ }
+
+ public Properties getStatistics() throws ExecutionEngineException {
+ // TODO
+ Properties stats = new Properties();
+ return stats;
+ }
+
+ public ExecutionEnginePhysicalPlan compile(ExecutionEngineLogicalPlan plan,
+ Properties properties)
+ throws ExecutionEngineException {
+ // TODO
+ // first step is to visit the logical plan:
+ // if LOSplit: need to add more aliases in the table
+ //
+ // compilation process uses the alias table
+ //
+ // emit the physical representation, instead of finding the intermed results in LORead
+ // we need to look into the table.
+ //
+ return null;
+ }
+
+ public Collection<ExecutionEnginePhysicalPlan> physicalPlans ()
+ throws ExecutionEngineException {
+ // TODO
+ return null;
+ }
+}
Added: incubator/pig/branches/plan/src/org/apache/pig/backend/local/datastorage/LocalDataStorage.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/plan/src/org/apache/pig/backend/local/datastorage/LocalDataStorage.java?rev=603773&view=auto
==============================================================================
--- incubator/pig/branches/plan/src/org/apache/pig/backend/local/datastorage/LocalDataStorage.java (added)
+++ incubator/pig/branches/plan/src/org/apache/pig/backend/local/datastorage/LocalDataStorage.java Wed Dec 12 15:10:51 2007
@@ -0,0 +1,131 @@
+package org.apache.pig.backend.local.datastorage;
+
+import java.io.IOException;
+import java.io.File;
+import java.util.Properties;
+
+import org.apache.pig.backend.datastorage.*;
+
+public class LocalDataStorage implements DataStorage {
+
+ protected File workingDir;
+
+ public LocalDataStorage() {
+ workingDir = new File(System.getProperty("user.dir"));
+ }
+
+ @Override
+ public void init() {
+ ;
+ }
+
+ @Override
+ public void close() throws IOException {
+ ;
+ }
+
+ @Override
+ public Properties getConfiguration() {
+ Properties config = new Properties();
+
+ config.put(DEFAULT_REPLICATION_FACTOR_KEY, (new Integer(1)).toString());
+
+ return config;
+ }
+
+ @Override
+ public void updateConfiguration(Properties newConfiguration)
+ throws DataStorageException {
+ ;
+ }
+
+ @Override
+ public Properties getStatistics() throws IOException {
+ Properties stats = new Properties();
+
+ //TODO determine used bytes in order to set the value of the
+ // key USED_BYTES_KEY
+
+ return stats;
+ }
+
+ @Override
+ public LocalFile asElement(String name)
+ throws DataStorageException {
+ return new LocalFile(this, name);
+ }
+
+ @Override
+ public LocalFile asElement(DataStorageElementDescriptor element)
+ throws DataStorageException {
+ return new LocalFile(this, element.toString());
+ }
+
+ @Override
+ public LocalFile asElement(String parent,
+ String child)
+ throws DataStorageException {
+ return new LocalFile(this,parent, child);
+ }
+
+ @Override
+ public LocalFile asElement(DataStorageContainerDescriptor parent,
+ String child)
+ throws DataStorageException {
+ return new LocalFile(this, parent.toString(), child);
+ }
+
+ @Override
+ public LocalFile asElement(DataStorageContainerDescriptor parent,
+ DataStorageElementDescriptor child)
+ throws DataStorageException {
+ return new LocalFile(this, parent.toString(), child.toString());
+ }
+
+ @Override
+ public LocalDir asContainer(String name)
+ throws DataStorageException {
+ return new LocalDir(this, name);
+ }
+
+ @Override
+ public LocalDir asContainer(DataStorageContainerDescriptor container)
+ throws DataStorageException {
+ return new LocalDir(this, container.toString());
+ }
+
+ @Override
+ public LocalDir asContainer(String parent,
+ String child)
+ throws DataStorageException {
+ return new LocalDir(this, parent, child);
+ }
+
+ @Override
+ public LocalDir asContainer(DataStorageContainerDescriptor parent,
+ String child)
+ throws DataStorageException {
+ return new LocalDir(this, parent.toString(), child);
+ }
+
+ @Override
+ public LocalDir asContainer(DataStorageContainerDescriptor parent,
+ DataStorageContainerDescriptor child)
+ throws DataStorageException {
+ return new LocalDir(this, parent.toString(), child.toString());
+ }
+
+ @Override
+ public void setActiveContainer(DataStorageContainerDescriptor container) {
+ this.workingDir = new File(container.toString());
+ }
+
+ @Override
+ public DataStorageContainerDescriptor getActiveContainer() {
+ return new LocalDir(this, this.workingDir.getPath());
+ }
+
+ public File getWorkingDir() {
+ return this.workingDir;
+ }
+}
Added: incubator/pig/branches/plan/src/org/apache/pig/backend/local/datastorage/LocalDir.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/plan/src/org/apache/pig/backend/local/datastorage/LocalDir.java?rev=603773&view=auto
==============================================================================
--- incubator/pig/branches/plan/src/org/apache/pig/backend/local/datastorage/LocalDir.java (added)
+++ incubator/pig/branches/plan/src/org/apache/pig/backend/local/datastorage/LocalDir.java Wed Dec 12 15:10:51 2007
@@ -0,0 +1,154 @@
+package org.apache.pig.backend.local.datastorage;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.LinkedList;
+import java.util.Properties;
+import java.util.Iterator;
+
+import org.apache.pig.backend.datastorage.DataStorageElementDescriptor;
+import org.apache.pig.backend.datastorage.DataStorageContainerDescriptor;
+import org.apache.pig.backend.datastorage.DataStorageException;
+import org.apache.pig.backend.datastorage.ImmutableOutputStream;
+import org.apache.pig.backend.datastorage.SeekableInputStream;
+
+public class LocalDir extends LocalPath
+ implements DataStorageContainerDescriptor {
+
+ public LocalDir(LocalDataStorage fs, String path) {
+ super(fs, path);
+ }
+
+ public LocalDir(LocalDataStorage fs, File path) {
+ super(fs, path);
+ }
+
+ public LocalDir(LocalDataStorage fs, String parent, String child) {
+ super(fs, parent, child);
+ }
+
+ public LocalDir(LocalDataStorage fs, File parent, File child) {
+ super(fs,
+ parent.getPath(),
+ child.getPath());
+ }
+
+ public LocalDir(LocalDataStorage fs, File parent, String child) {
+ this(fs, parent.getPath(), child);
+ }
+
+ public LocalDir(LocalDataStorage fs, String parent, File child) {
+ this(fs, parent, child.getPath());
+ }
+
+ @Override
+ public OutputStream create(Properties configuration)
+ throws IOException {
+ if (! getCurPath().mkdirs()) {
+ throw new IOException("Unable to create dirs for: " + this.path);
+ }
+
+ return new ImmutableOutputStream(path.toString());
+ }
+
+ @Override
+ public void copy(DataStorageElementDescriptor dstName,
+ Properties dstConfiguration,
+ boolean removeSrc)
+ throws IOException {
+ copy((DataStorageContainerDescriptor) dstName,
+ dstConfiguration,
+ removeSrc);
+ }
+
+
+ public void copy(DataStorageContainerDescriptor dstName,
+ Properties dstConfiguration,
+ boolean removeSrc)
+ throws IOException {
+ if (dstName == null) {
+ return;
+ }
+
+ if (!exists()) {
+ throw new IOException("Source does not exist " +
+ this);
+ }
+
+ if (dstName.exists()) {
+ throw new IOException("Destination already exists " +
+ dstName);
+ }
+
+ dstName.create();
+
+ Iterator<DataStorageElementDescriptor> elems = iterator();
+
+ try {
+ while (elems.hasNext()) {
+ DataStorageElementDescriptor curElem = elems.next();
+
+ if (curElem instanceof DataStorageContainerDescriptor) {
+ DataStorageContainerDescriptor dst =
+ dstName.getDataStorage().asContainer(dstName,
+ ((LocalPath)curElem).getPath().getName());
+
+ curElem.copy(dst, dstConfiguration, removeSrc);
+
+ if (removeSrc) {
+ curElem.delete();
+ }
+ }
+ else {
+ DataStorageElementDescriptor dst =
+ dstName.getDataStorage().asElement(dstName,
+ ((LocalPath)curElem).getPath().getName());
+
+ curElem.copy(dst, dstConfiguration, removeSrc);
+ }
+ }
+ }
+ catch (DataStorageException e) {
+ throw new IOException("Failed to copy " + this + " to " + dstName, e);
+ }
+
+ if (removeSrc) {
+ delete();
+ }
+ }
+
+ @Override
+ public InputStream open() throws IOException {
+ throw new IOException("Cannot open dir " + path);
+ }
+
+ @Override
+ public SeekableInputStream sopen() throws IOException {
+ throw new IOException("Cannot sopen dir " + path);
+ }
+
+ @Override
+ public Iterator<DataStorageElementDescriptor> iterator() {
+ LinkedList<DataStorageElementDescriptor> elements =
+ new LinkedList<DataStorageElementDescriptor>();
+
+ try {
+ File[] files = getCurPath().listFiles();
+
+ for (File f : files) {
+ if (f.isFile()) {
+ elements.add(fs.asElement(f.getPath()));
+ }
+ else {
+ elements.add(fs.asContainer(f.getPath()));
+ }
+ }
+ }
+ catch (DataStorageException e) {
+ }
+
+ return elements.iterator();
+ }
+}
Added: incubator/pig/branches/plan/src/org/apache/pig/backend/local/datastorage/LocalFile.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/plan/src/org/apache/pig/backend/local/datastorage/LocalFile.java?rev=603773&view=auto
==============================================================================
--- incubator/pig/branches/plan/src/org/apache/pig/backend/local/datastorage/LocalFile.java (added)
+++ incubator/pig/branches/plan/src/org/apache/pig/backend/local/datastorage/LocalFile.java Wed Dec 12 15:10:51 2007
@@ -0,0 +1,115 @@
+
+package org.apache.pig.backend.local.datastorage;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.FileNotFoundException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.FileOutputStream;
+import java.io.FileInputStream;
+import java.util.Properties;
+
+import org.apache.pig.backend.datastorage.*;
+
+public class LocalFile extends LocalPath {
+
+ public LocalFile(LocalDataStorage fs, String path) {
+ super(fs, path);
+ }
+
+ public LocalFile(LocalDataStorage fs, File path) {
+ super(fs, path);
+ }
+
+ public LocalFile(LocalDataStorage fs, String parent, String child) {
+ super(fs, parent, child);
+ }
+
+ public LocalFile(LocalDataStorage fs, File parent, File child) {
+ super(fs,
+ parent.getPath(),
+ child.getPath());
+ }
+
+ public LocalFile(LocalDataStorage fs, File parent, String child) {
+ this(fs, parent.getPath(), child);
+ }
+
+ public LocalFile(LocalDataStorage fs, String parent, File child) {
+ this(fs, parent, child.getPath());
+ }
+
+ @Override
+ public OutputStream create(Properties configuration)
+ throws IOException {
+ if (! getCurPath().createNewFile()) {
+ throw new IOException("Failed to create file " + this.path);
+ }
+
+ return new FileOutputStream(getCurPath());
+ }
+
+ @Override
+ public void copy(DataStorageElementDescriptor dstName,
+ Properties dstConfiguration,
+ boolean removeSrc)
+ throws IOException {
+ if (dstName == null) {
+ return;
+ }
+
+ if (!exists()) {
+ throw new IOException("Source does not exist " +
+ this);
+ }
+
+ if (dstName.exists()) {
+ if (dstName instanceof DataStorageContainerDescriptor) {
+ try {
+ dstName = dstName.getDataStorage().
+ asElement((DataStorageContainerDescriptor) dstName,
+ path.getName());
+ }
+ catch (DataStorageException e) {
+ throw new IOException("Unable to generate element name (src: " +
+ this + ", dst: " + dstName + ")",
+ e);
+ }
+ }
+ }
+
+ InputStream in = null;
+ OutputStream out = null;
+
+ in = this.open();
+ out = dstName.create(dstConfiguration);
+
+ byte[] data = new byte[4 * 1024];
+ int bc;
+ while((bc = in.read(data)) != -1) {
+ out.write(data, 0, bc);
+ }
+
+ out.close();
+
+ if (removeSrc) {
+ delete();
+ }
+ }
+
+ @Override
+ public InputStream open () throws IOException {
+ return new FileInputStream(this.path);
+ }
+
+ @Override
+ public SeekableInputStream sopen() throws IOException {
+ try {
+ return new LocalSeekableInputStream(this.path);
+ }
+ catch (FileNotFoundException e) {
+ throw new IOException("Unable to find " + this.path, e);
+ }
+ }
+}
Added: incubator/pig/branches/plan/src/org/apache/pig/backend/local/datastorage/LocalPath.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/plan/src/org/apache/pig/backend/local/datastorage/LocalPath.java?rev=603773&view=auto
==============================================================================
--- incubator/pig/branches/plan/src/org/apache/pig/backend/local/datastorage/LocalPath.java (added)
+++ incubator/pig/branches/plan/src/org/apache/pig/backend/local/datastorage/LocalPath.java Wed Dec 12 15:10:51 2007
@@ -0,0 +1,155 @@
+package org.apache.pig.backend.local.datastorage;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.File;
+import java.util.Properties;
+
+import org.apache.pig.backend.datastorage.DataStorage;
+import org.apache.pig.backend.datastorage.DataStorageElementDescriptor;
+import org.apache.pig.backend.datastorage.SeekableInputStream;
+
+public abstract class LocalPath implements DataStorageElementDescriptor {
+
+ //Comparable<DataStorageElementDescriptor> {
+
+ protected DataStorage fs;
+ protected File path;
+
+ protected File getCurPath() {
+ File path;
+
+ if (this.path.isAbsolute()) {
+ path = this.path;
+ }
+ else {
+ path = new File(fs.getActiveContainer().toString(),
+ this.path.getPath());
+ }
+
+ return path;
+ }
+
+ public LocalPath(LocalDataStorage fs, String path) {
+ this.fs = fs;
+ this.path = new File(path);
+ }
+
+ public LocalPath(LocalDataStorage fs, File path) {
+ this.fs = fs;
+ this.path = new File(path.getPath());
+ }
+
+ public LocalPath(LocalDataStorage fs, String parent, String child) {
+ this.fs = fs;
+ this.path = new File(parent, child);
+ }
+
+ public LocalPath(LocalDataStorage fs, File parent, File child) {
+ this.fs = fs;
+ this.path = new File(parent.getPath(),
+ child.getPath());
+ }
+
+ public LocalPath(LocalDataStorage fs, File parent, String child) {
+ this(fs, parent.getPath(), child);
+ }
+
+ public LocalPath(LocalDataStorage fs, String parent, File child) {
+ this(fs, parent, child.getPath());
+ }
+
+ @Override
+ public DataStorage getDataStorage() {
+ return fs;
+ }
+
+ public File getPath() {
+ return this.path;
+ }
+
+ @Override
+ public abstract OutputStream create(Properties configuration)
+ throws IOException;
+
+ @Override
+ public OutputStream create()
+ throws IOException {
+ return create(null);
+ }
+
+ @Override
+ public abstract void copy(DataStorageElementDescriptor dstName,
+ Properties dstConfiguration,
+ boolean removeSrc)
+ throws IOException;
+
+ @Override
+ public void copy(DataStorageElementDescriptor dstName,
+ boolean removeSrc) throws IOException {
+ copy(dstName, null, removeSrc);
+ }
+
+ @Override
+ public abstract InputStream open() throws IOException;
+
+ @Override
+ public abstract SeekableInputStream sopen() throws IOException;
+
+ @Override
+ public boolean exists() throws IOException {
+ return getCurPath().exists();
+ }
+
+ @Override
+ public void rename(DataStorageElementDescriptor newName)
+ throws IOException {
+ if (! this.path.renameTo(((LocalPath)newName).path)) {
+ throw new IOException("Unalbe to rename " + this.path +
+ "to " + ((LocalPath)newName).path);
+ }
+ }
+
+ @Override
+ public void delete() throws IOException {
+ getCurPath().delete();
+ }
+
+ @Override
+ public Properties getConfiguration() throws IOException {
+ Properties props = new Properties();
+
+ props.put(BLOCK_REPLICATION_KEY, "1");
+
+ return props;
+ }
+
+ @Override
+ public void updateConfiguration(Properties newConfig)
+ throws IOException {
+ ;
+ }
+
+ @Override
+ public Properties getStatistics() throws IOException {
+ Properties stats = new Properties();
+
+ long size = this.path.length();
+ stats.put(LENGTH_KEY , (new Long(size)).toString());
+
+ long lastModified = this.path.lastModified();
+ stats.put(MODIFICATION_TIME_KEY, (new Long(lastModified)).toString());
+
+ return stats;
+ }
+
+ @Override
+ public int compareTo(DataStorageElementDescriptor other) {
+ return this.path.compareTo(((LocalPath)other).path);
+ }
+
+ public String toString() {
+ return this.path.toString();
+ }
+}
Added: incubator/pig/branches/plan/src/org/apache/pig/backend/local/datastorage/LocalSeekableInputStream.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/plan/src/org/apache/pig/backend/local/datastorage/LocalSeekableInputStream.java?rev=603773&view=auto
==============================================================================
--- incubator/pig/branches/plan/src/org/apache/pig/backend/local/datastorage/LocalSeekableInputStream.java (added)
+++ incubator/pig/branches/plan/src/org/apache/pig/backend/local/datastorage/LocalSeekableInputStream.java Wed Dec 12 15:10:51 2007
@@ -0,0 +1,107 @@
+package org.apache.pig.backend.local.datastorage;
+
+import java.io.RandomAccessFile;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import org.apache.pig.backend.datastorage.*;
+
+public class LocalSeekableInputStream extends SeekableInputStream {
+
+ protected RandomAccessFile file;
+ protected long curMark;
+
+ public LocalSeekableInputStream(File file) throws FileNotFoundException {
+ this.file = new RandomAccessFile(file, "r");
+ this.curMark = 0;
+ }
+
+ @Override
+ public void seek(long offset, FLAGS whence) throws IOException {
+ long targetPos;
+
+ switch (whence) {
+ case SEEK_SET: {
+ targetPos = offset;
+ break;
+ }
+ case SEEK_CUR: {
+ targetPos = this.file.getFilePointer() + offset;
+ break;
+ }
+ case SEEK_END: {
+ targetPos = this.file.length() + offset;
+ break;
+ }
+ default: {
+ throw new IOException("Invalid seek option: " + whence);
+ }
+ }
+
+ this.file.seek(targetPos);
+ }
+
+ @Override
+ public long tell() throws IOException {
+ return this.file.getFilePointer();
+ }
+
+ @Override
+ public int read() throws IOException {
+ return this.file.read();
+ }
+
+ @Override
+ public int read(byte[] b) throws IOException {
+ return this.file.read(b);
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len ) throws IOException {
+ return this.file.read(b, off, len);
+ }
+
+ @Override
+ public int available() throws IOException {
+ throw new IOException("No information on available bytes");
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ long skipped = 0;
+
+ if (n > 0) {
+ skipped = this.file.length() - tell();
+
+ seek(n, FLAGS.SEEK_CUR);
+ }
+
+ return skipped;
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.file.close();
+ }
+
+ @Override
+ public void mark(int readlimit) {
+ try {
+ this.curMark = tell();
+ }
+ catch (IOException e) {
+ ;
+ }
+ }
+
+ @Override
+ public void reset() throws IOException {
+ seek(this.curMark, FLAGS.SEEK_SET);
+ }
+
+ @Override
+ public boolean markSupported() {
+ return true;
+ }
+}