You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/04/02 22:11:10 UTC
svn commit: r644033 - in /incubator/pig/trunk: ./
lib-src/bzip2/org/apache/tools/bzip2r/ src/org/apache/pig/
src/org/apache/pig/backend/datastorage/
src/org/apache/pig/backend/executionengine/
src/org/apache/pig/backend/hadoop/datastorage/ src/org/apac...
Author: olga
Date: Wed Apr 2 13:11:06 2008
New Revision: 644033
URL: http://svn.apache.org/viewvc?rev=644033&view=rev
Log:
PIG-55: addition of custom splitter framework
Added:
incubator/pig/trunk/src/org/apache/pig/Slice.java
incubator/pig/trunk/src/org/apache/pig/Slicer.java
incubator/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlice.java
incubator/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlicer.java
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/SliceWrapper.java
incubator/pig/trunk/src/org/apache/pig/impl/io/ValidatingInputFileSpec.java
incubator/pig/trunk/test/org/apache/pig/test/RangeSlicer.java
incubator/pig/trunk/test/org/apache/pig/test/TestCustomSlicer.java
incubator/pig/trunk/test/org/apache/pig/test/TestParser.java
Removed:
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigSplit.java
Modified:
incubator/pig/trunk/CHANGES.txt
incubator/pig/trunk/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2InputStream.java
incubator/pig/trunk/src/org/apache/pig/PigServer.java
incubator/pig/trunk/src/org/apache/pig/backend/datastorage/ElementDescriptor.java
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HPath.java
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigInputFormat.java
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java
incubator/pig/trunk/src/org/apache/pig/backend/local/datastorage/LocalDataStorage.java
incubator/pig/trunk/src/org/apache/pig/backend/local/datastorage/LocalPath.java
incubator/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/LoadOptimizer.java
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
incubator/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
Modified: incubator/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=644033&r1=644032&r2=644033&view=diff
==============================================================================
--- incubator/pig/trunk/CHANGES.txt (original)
+++ incubator/pig/trunk/CHANGES.txt Wed Apr 2 13:11:06 2008
@@ -197,3 +197,5 @@
the top level directory (joa23 via gates).
PIG-94: M3 code update for streaming
+
+ PIG-55: added custom splitter
Modified: incubator/pig/trunk/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2InputStream.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2InputStream.java?rev=644033&r1=644032&r2=644033&view=diff
==============================================================================
--- incubator/pig/trunk/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2InputStream.java (original)
+++ incubator/pig/trunk/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2InputStream.java Wed Apr 2 13:11:06 2008
@@ -59,15 +59,10 @@
*/
package org.apache.tools.bzip2r;
-import java.io.BufferedInputStream;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.InputStream;
import java.io.IOException;
-import java.util.Random;
+import java.io.InputStream;
-import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.pig.backend.datastorage.SeekableInputStream;
/**
* An input stream that decompresses from the BZip2 format (without the file
@@ -142,7 +137,7 @@
private int[][] perm = new int[N_GROUPS][MAX_ALPHA_SIZE];
private int[] minLens = new int[N_GROUPS];
- private FSDataInputStream innerBsStream;
+ private SeekableInputStream innerBsStream;
long readLimit = Long.MAX_VALUE;
public long getReadLimit() {
return readLimit;
@@ -182,8 +177,8 @@
private long retPos, oldPos;
- public CBZip2InputStream(FSDataInputStream zStream, int blockSize) throws IOException {
- retPos = oldPos = zStream.getPos();
+ public CBZip2InputStream(SeekableInputStream zStream, int blockSize) throws IOException {
+ retPos = oldPos = zStream.tell();
ll8 = null;
tt = null;
checkComputedCombinedCRC = blockSize == -1;
@@ -193,7 +188,7 @@
setupBlock();
}
- public CBZip2InputStream(FSDataInputStream zStream) throws IOException {
+ public CBZip2InputStream(SeekableInputStream zStream) throws IOException {
this(zStream, -1);
}
@@ -231,7 +226,7 @@
public long getPos() throws IOException{
if (innerBsStream == null)
return retPos;
- long newPos = innerBsStream.getPos();
+ long newPos = innerBsStream.tell();
if (newPos != oldPos){
retPos = oldPos;
@@ -364,7 +359,7 @@
}
}
- private void bsSetStream(FSDataInputStream f) {
+ private void bsSetStream(SeekableInputStream f) {
innerBsStream = f;
bsLive = 0;
bsBuff = 0;
Modified: incubator/pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/PigServer.java?rev=644033&r1=644032&r2=644033&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/PigServer.java Wed Apr 2 13:11:06 2008
@@ -28,13 +28,11 @@
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
-import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.backend.datastorage.ContainerDescriptor;
import org.apache.pig.backend.datastorage.DataStorage;
-import org.apache.pig.backend.datastorage.DataStorageException;
import org.apache.pig.backend.datastorage.ElementDescriptor;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.executionengine.ExecJob;
@@ -314,7 +312,7 @@
// already submitted to the back-end for compilation and
// execution.
- LogicalPlan readFrom = (LogicalPlan) aliases.get(id);
+ LogicalPlan readFrom = aliases.get(id);
// Run
try {
@@ -351,7 +349,7 @@
if (!aliases.containsKey(id))
throw new IOException("Invalid alias: " + id);
- if (FileLocalizer.fileExists(filename, pigContext)) {
+ if (FileLocalizer.fileExists(filename, pigContext.getDfs())) {
StringBuilder sb = new StringBuilder();
sb.append("Output file ");
sb.append(filename);
@@ -481,19 +479,17 @@
/**
* Returns the length of a file in bytes which exists in the HDFS (accounts for replication).
* @param filename
- * @return
* @throws IOException
*/
public long fileSize(String filename) throws IOException {
DataStorage dfs = pigContext.getDfs();
ElementDescriptor elem = dfs.asElement(filename);
- Map<String, Object> elemProps = elem.getStatistics();
- String length = (String) elemProps.get(ElementDescriptor.LENGTH_KEY);
-
- Properties dfsProps = dfs.getConfiguration();
- String replication = dfsProps.getProperty(DataStorage.DEFAULT_REPLICATION_FACTOR_KEY);
-
- return (new Long(length)).longValue() * (new Integer(replication)).intValue();
+ Map<String, Object> stats = elem.getStatistics();
+ long length = (Long) stats.get(ElementDescriptor.LENGTH_KEY);
+ int replication = (Short) stats
+ .get(ElementDescriptor.BLOCK_REPLICATION_KEY);
+
+ return length * replication;
}
public boolean existsFile(String filename) throws IOException {
Added: incubator/pig/trunk/src/org/apache/pig/Slice.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/Slice.java?rev=644033&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/Slice.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/Slice.java Wed Apr 2 13:11:06 2008
@@ -0,0 +1,83 @@
+package org.apache.pig;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.pig.backend.datastorage.DataStorage;
+import org.apache.pig.data.Tuple;
+
+/**
+ * A grouping of data on that can be processed individually by Pig. Instances of
+ * this interface are created by {@link Slicer}, serialized, and sent to nodes
+ * to be processed.
+ * <p>
+ * {@link #getLocations} is called as part of the configuration process to
+ * determine where this Slice should be run for maximal locality with the data
+ * to be read. Once the Slice arrives on the processing node,
+ * {@link #init(DataStorage)} is called to give it access to the
+ * <code>DataStorage</code> it should use to load Tuples. After
+ * <code>init</code> has been called, any of the other methods on this
+ * interface may be called as part of Pig's processing.
+ */
+public interface Slice extends Serializable {
+
+ /**
+ * Returns string representations of all the files that will be used as part
+ * of processing this Slice.
+ * <p>
+ *
+ * This is the only method on Slice that is valid to call before
+ * {@link #init(DataStorage)} has been called.
+ */
+ String[] getLocations();
+
+ /**
+ * Initializes this Slice with the DataStorage it's to use to do its work.
+ * <p>
+ * This will always be called before <code>getLength</code>,
+ * <code>close</code>, <code>getPos</code>, <code>getProgress</code>
+ * and <code>next</code>.
+ */
+ void init(DataStorage store) throws IOException;
+
+ /**
+ * Returns the length in bytes of all of the data that will be processed by
+ * this Slice.
+ * <p>
+ * Only valid to call after {@link #init(DataStorage)} has been called.
+ */
+ long getLength();
+
+ /**
+ * Closes any streams this Slice has opened as part of its work.
+ * <p>
+ * Only valid to call after {@link #init(DataStorage)} has been called.
+ */
+ void close() throws IOException;
+
+ /**
+ * Returns the number of bytes read so far as part of processing this Slice.
+ * <p>
+ * Only valid to call after {@link #init(DataStorage)} has been called.
+ */
+ long getPos() throws IOException;
+
+ /**
+ * Returns the percentage of Slice that is complete from 0.0 to 1.0.
+ * <p>
+ * Only valid to call after {@link #init(DataStorage)} has been called.
+ */
+ float getProgress() throws IOException;
+
+ /**
+ * Loads the next value from this Slice into <code>value</code>.
+ * <p>
+ * Only valid to call after {@link #init(DataStorage)} has been called.
+ *
+ * @param value -
+ * the Tuple to be filled with the next value.
+ * @return - true if there are more Tuples to be read.
+ */
+ boolean next(Tuple value) throws IOException;
+
+}
Added: incubator/pig/trunk/src/org/apache/pig/Slicer.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/Slicer.java?rev=644033&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/Slicer.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/Slicer.java Wed Apr 2 13:11:06 2008
@@ -0,0 +1,32 @@
+package org.apache.pig;
+
+import java.io.IOException;
+
+import org.apache.pig.backend.datastorage.DataStorage;
+
+/**
+ * Produces independent slices of data from a given location to be processed in
+ * parallel by Pig.
+ * <p>
+ * If a class implementing this interface is given as the LoadFunc in a Pig
+ * script, it will be used to make slices for that load statement.
+ */
+public interface Slicer {
+ /**
+ * Checks that <code>location</code> is parsable by this Slicer, and that
+ * if the DataStorage is used by the Slicer, it's readable from there. If it
+ * isn't, an IOException with a message explaining why will be thrown.
+ * <p>
+ * This does not ensure that all the data in <code>location</code> is
+ * valid. It's a preflight check that there's some chance of the Slicer
+ * working before actual Slices are created and sent off for processing.
+ */
+ void validate(DataStorage store, String location) throws IOException;
+
+ /**
+ * Creates slices of data from <code>store</code> at <code>location</code>.
+ *
+ * @return the Slices to be serialized and sent out to nodes for processing.
+ */
+ Slice[] slice(DataStorage store, String location) throws IOException;
+}
Modified: incubator/pig/trunk/src/org/apache/pig/backend/datastorage/ElementDescriptor.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/datastorage/ElementDescriptor.java?rev=644033&r1=644032&r2=644033&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/datastorage/ElementDescriptor.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/datastorage/ElementDescriptor.java Wed Apr 2 13:11:06 2008
@@ -16,9 +16,16 @@
public interface ElementDescriptor extends
Comparable<ElementDescriptor> {
+ /** Available from getConfiguration as a String and getStatistics as a Long. */
public static final String BLOCK_SIZE_KEY = "pig.path.block.size";
+
+ /** Available from getConfiguration as a String and getStatistics as a Short. */
public static final String BLOCK_REPLICATION_KEY = "pig.path.block.replication";
+
+ /** Available from getStatistics as a Long. */
public static final String LENGTH_KEY = "pig.path.length";
+
+ /** Available from getStatistics as a Long. */
public static final String MODIFICATION_TIME_KEY = "pig.path.modification.time";
//
Added: incubator/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlice.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlice.java?rev=644033&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlice.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlice.java Wed Apr 2 13:11:06 2008
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.executionengine;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.zip.GZIPInputStream;
+
+import org.apache.pig.Slice;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.backend.datastorage.DataStorage;
+import org.apache.pig.backend.datastorage.SeekableInputStream;
+import org.apache.pig.backend.datastorage.SeekableInputStream.FLAGS;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.tools.bzip2r.CBZip2InputStream;
+
+/**
+ * Slice that loads data using a LoadFunc.
+ */
+public class PigSlice implements Slice {
+
+ public PigSlice(String path, String parser, long start, long length) {
+ this.file = path;
+ this.start = start;
+ this.length = length;
+ this.parser = parser;
+ }
+
+ public String[] getLocations() {
+ return new String[] { file };
+ }
+
+ public long getLength() {
+ return length;
+ }
+
+ public void init(DataStorage base) throws IOException {
+ if (parser == null) {
+ loader = new PigStorage();
+ } else {
+ try {
+ loader = (LoadFunc) PigContext.instantiateFuncFromSpec(parser);
+ } catch (Exception exp) {
+ throw new RuntimeException("can't instantiate " + parser);
+ }
+ }
+ fsis = base.asElement(base.getActiveContainer(), file).sopen();
+ fsis.seek(start, FLAGS.SEEK_CUR);
+
+ end = start + getLength();
+
+ if (file.endsWith(".bz") || file.endsWith(".bz2")) {
+ is = new CBZip2InputStream(fsis, 9);
+ } else if (file.endsWith(".gz")) {
+ is = new GZIPInputStream(fsis);
+ // We can't tell how much of the underlying stream GZIPInputStream
+ // has actually consumed
+ end = Long.MAX_VALUE;
+ } else {
+ is = fsis;
+ }
+ loader.bindTo(file.toString(), new BufferedPositionedInputStream(is,
+ start), start, end);
+ }
+
+ public boolean next(Tuple value) throws IOException {
+ Tuple t = loader.getNext();
+ if (t == null) {
+ return false;
+ }
+ value.copyFrom(t);
+ return true;
+ }
+
+ public long getPos() throws IOException {
+ return fsis.tell();
+ }
+
+ public void close() throws IOException {
+ is.close();
+ }
+
+ public float getProgress() throws IOException {
+ float progress = getPos() - start;
+ float finish = getLength();
+ return progress / finish;
+ }
+
+ // assigned during construction
+ String file;
+ long start;
+ long length;
+ String parser;
+
+ // Created as part of init
+ private InputStream is;
+ private SeekableInputStream fsis;
+ private long end;
+ private LoadFunc loader;
+
+ private static final long serialVersionUID = 1L;
+}
Added: incubator/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlicer.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlicer.java?rev=644033&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlicer.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlicer.java Wed Apr 2 13:11:06 2008
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.executionengine;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.pig.Slice;
+import org.apache.pig.Slicer;
+import org.apache.pig.backend.datastorage.ContainerDescriptor;
+import org.apache.pig.backend.datastorage.DataStorage;
+import org.apache.pig.backend.datastorage.ElementDescriptor;
+import org.apache.pig.impl.io.FileLocalizer;
+
+/**
+ * Creates a slice per block size element in all files at location. If location
+ * is a glob or a directory, slices are created for every matched file.
+ * <p>
+ *
+ * If individual files at location end with <code>.gz</code> or
+ * <code>.bz2</code>, they will be decompressed before being passed on to the
+ * LoadFunc.
+ */
+public class PigSlicer implements Slicer {
+ /**
+ * @param funcSpec -
+ * the funcSpec for a LoadFunc that can process the data at
+ * location.
+ */
+ public PigSlicer(String funcSpec) {
+ this.funcSpec = funcSpec;
+ }
+
+ public void setSplittable(boolean splittable) {
+ this.splittable = splittable;
+ }
+
+ public Slice[] slice(DataStorage store, String location) throws IOException {
+ validate(store, location);
+ List<Slice> slices = new ArrayList<Slice>();
+ List<ElementDescriptor> paths = new ArrayList<ElementDescriptor>();
+
+ // If you give a non-glob name, asCollection returns a single
+ // element with just that name.
+ ElementDescriptor[] globPaths = store.asCollection(location);
+ for (int m = 0; m < globPaths.length; m++) {
+ paths.add(globPaths[m]);
+ }
+ for (int j = 0; j < paths.size(); j++) {
+ ElementDescriptor fullPath = store.asElement(store
+ .getActiveContainer(), paths.get(j));
+ if (fullPath instanceof ContainerDescriptor) {
+ for (ElementDescriptor child : ((ContainerDescriptor) fullPath)) {
+ paths.add(child);
+ }
+ continue;
+ }
+ Map<String, Object> stats = fullPath.getStatistics();
+ long bs = (Long) (stats.get(ElementDescriptor.BLOCK_SIZE_KEY));
+ long size = (Long) (stats.get(ElementDescriptor.LENGTH_KEY));
+ long pos = 0;
+ String name = fullPath.toString();
+ System.out.println(size + " " + name);
+ if (name.endsWith(".gz") || !splittable) {
+ // Anything that ends with a ".gz" we must process as a complete
+ // file
+ slices.add(new PigSlice(name, funcSpec, 0, size));
+ } else {
+ while (pos < size) {
+ if (pos + bs > size) {
+ bs = size - pos;
+ }
+ slices.add(new PigSlice(name, funcSpec, pos, bs));
+ pos += bs;
+ }
+ }
+ }
+ return slices.toArray(new Slice[slices.size()]);
+ }
+
+ public void validate(DataStorage store, String location) throws IOException {
+ if (!FileLocalizer.fileExists(location, store)) {
+ throw new IOException(store.asElement(location) + " does not exist");
+ }
+ }
+
+ private String funcSpec;
+
+ private boolean splittable;
+}
Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java?rev=644033&r1=644032&r2=644033&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java Wed Apr 2 13:11:06 2008
@@ -35,20 +35,20 @@
public HDataStorage(HConfiguration conf) throws IOException {
fs = FileSystem.get(conf.getConfiguration());
}
-
+
public void init() { }
public void close() throws IOException {
fs.close();
}
-
+
public Properties getConfiguration() {
Properties props = new HConfiguration(fs.getConf());
-
+
short defaultReplication = fs.getDefaultReplication();
- props.setProperty(DEFAULT_REPLICATION_FACTOR_KEY,
- (new Short(defaultReplication)).toString());
-
+ props.setProperty(DEFAULT_REPLICATION_FACTOR_KEY, ""
+ + defaultReplication);
+
return props;
}
Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HPath.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HPath.java?rev=644033&r1=644032&r2=644033&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HPath.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HPath.java Wed Apr 2 13:11:06 2008
@@ -8,6 +8,7 @@
import java.util.Map;
import java.util.HashMap;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
@@ -123,13 +124,12 @@
public Map<String, Object> getStatistics() throws IOException {
HashMap<String, Object> props = new HashMap<String, Object>();
- Long length = new Long(fs.getHFS().getFileStatus(path).getLen());
+ FileStatus fileStatus = fs.getHFS().getFileStatus(path);
- Long modificationTime = new Long(fs.getHFS().getFileStatus(path).
- getModificationTime());
-
- props.put(LENGTH_KEY, length.toString());
- props.put(MODIFICATION_TIME_KEY, modificationTime.toString());
+ props.put(BLOCK_SIZE_KEY, fileStatus.getBlockSize());
+ props.put(BLOCK_REPLICATION_KEY, fileStatus.getReplication());
+ props.put(LENGTH_KEY, fileStatus.getLen());
+ props.put(MODIFICATION_TIME_KEY, fileStatus.getModificationTime());
return props;
}
Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java?rev=644033&r1=644032&r2=644033&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java Wed Apr 2 13:11:06 2008
@@ -19,13 +19,10 @@
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Comparator;
import java.util.Map;
import java.util.Iterator;
-import org.apache.hadoop.io.WritableComparator;
import org.apache.pig.builtin.BinStorage;
-import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.FunctionInstantiator;
import org.apache.pig.impl.builtin.FindQuantiles;
@@ -38,7 +35,6 @@
import org.apache.pig.impl.eval.GenerateSpec;
import org.apache.pig.impl.eval.ProjectSpec;
import org.apache.pig.impl.eval.CompositeEvalSpec;
-import org.apache.pig.impl.eval.MapLookupSpec;
import org.apache.pig.impl.eval.SortDistinctSpec;
import org.apache.pig.impl.eval.StarSpec;
import org.apache.pig.impl.eval.EvalSpecVisitor;
@@ -53,16 +49,11 @@
import org.apache.pig.impl.logicalLayer.LOStore;
import org.apache.pig.impl.logicalLayer.LOUnion;
import org.apache.pig.impl.logicalLayer.LogicalOperator;
-import org.apache.pig.impl.physicalLayer.PlanCompiler;
import org.apache.pig.impl.logicalLayer.OperatorKey;
import org.apache.pig.impl.logicalLayer.parser.NodeIdGenerator;
import org.apache.pig.impl.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.executionengine.ExecPhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.mapreduceExec.SortPartitioner;
-import org.apache.pig.backend.hadoop.datastorage.HFile;
-import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
-import org.apache.pig.impl.logicalLayer.LogicalPlan;
-import org.apache.pig.impl.logicalLayer.parser.NodeIdGenerator;
// compiler for mapreduce physical plans
public class MapreducePlanCompiler {
@@ -98,9 +89,7 @@
logicalKey,
pigContext);
- String filename = FileLocalizer.fullPath(materializedResult.outFileSpec.getFileName(), pigContext);
- FileSpec fileSpec = new FileSpec(filename, materializedResult.outFileSpec.getFuncSpec());
- pom.addInputFile(fileSpec);
+ pom.addInputFile(materializedResult.outFileSpec);
pom.mapParallelism = Math.max(pom.mapParallelism, materializedResult.parallelismRequest);
return pom.getOperatorKey();
@@ -188,13 +177,10 @@
logicalKey,
pigContext,
compiledInputs);
- LOLoad loLoad = (LOLoad) lo;
- String filename = FileLocalizer.fullPath(loLoad.getInputFileSpec().getFileName(), pigContext);
- FileSpec fileSpec = new FileSpec(filename, loLoad.getInputFileSpec().getFuncSpec());
- pom.addInputFile(fileSpec);
+ pom.addInputFile(((LOLoad) lo).getInputFileSpec());
pom.mapParallelism = Math.max(pom.mapParallelism, lo.getRequestedParallelism());
- pom.setProperty("pig.input.splitable",
- Boolean.toString(loLoad.isSplitable()));
+ pom.setProperty("pig.input.splittable",
+ Boolean.toString(((LOLoad)lo).isSplittable()));
return pom.getOperatorKey();
}
else if (lo instanceof LOStore) {
@@ -407,7 +393,7 @@
String comparatorFuncName = loSort.getSortSpec().getComparatorName();
if (comparatorFuncName != null) {
sortJob.userComparator =
- (Class<WritableComparator>)PigContext.resolveClassName(
+ PigContext.resolveClassName(
comparatorFuncName);
}
Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java?rev=644033&r1=644032&r2=644033&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java Wed Apr 2 13:11:06 2008
@@ -68,8 +68,7 @@
}
}
- PigSplit split = PigInputFormat.PigRecordReader.getPigRecordReader().getPigFileSplit();
- index = split.getIndex();
+ index = PigInputFormat.getActiveSplit().getIndex();
Datum groupName = ((Tuple) key).getField(0);
finalout.group = ((Tuple) key);
Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigInputFormat.java?rev=644033&r1=644032&r2=644033&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigInputFormat.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigInputFormat.java Wed Apr 2 13:11:06 2008
@@ -18,52 +18,53 @@
package org.apache.pig.backend.hadoop.executionengine.mapreduceExec;
import java.io.IOException;
-import java.io.InputStream;
import java.util.ArrayList;
+import java.util.List;
-import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobConfigurable;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
-import org.apache.pig.LoadFunc;
+import org.apache.pig.Slice;
+import org.apache.pig.backend.datastorage.DataStorage;
+import org.apache.pig.backend.executionengine.PigSlicer;
+import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.eval.EvalSpec;
-import org.apache.pig.impl.io.BufferedPositionedInputStream;
import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.io.ValidatingInputFileSpec;
import org.apache.pig.impl.util.ObjectSerializer;
-import org.apache.tools.bzip2r.CBZip2InputStream;
+public class PigInputFormat implements InputFormat<Text, Tuple>,
+ JobConfigurable {
-public class PigInputFormat implements InputFormat<Text, Tuple>, JobConfigurable {
-
- public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
- boolean isSplitable = job.getBoolean("pig.input.splitable", false);
-
- ArrayList<FileSpec> inputs = (ArrayList<FileSpec>) ObjectSerializer.deserialize(job.get("pig.inputs"));
- ArrayList<EvalSpec> mapFuncs = (ArrayList<EvalSpec>) ObjectSerializer.deserialize(job.get("pig.mapFuncs",""));
- ArrayList<EvalSpec> groupFuncs = (ArrayList<EvalSpec>) ObjectSerializer.deserialize(job.get("pig.groupFuncs", ""));
- PigContext pigContext = (PigContext)ObjectSerializer.deserialize(job.get("pig.pigContext"));
- //TODO: don't understand this code
+ @SuppressWarnings("unchecked")
+ public InputSplit[] getSplits(JobConf job, int numSplits)
+ throws IOException {
+ boolean isSplittable = job.getBoolean("pig.input.splittable", false);
+ ArrayList<FileSpec> inputs = (ArrayList<FileSpec>) ObjectSerializer
+ .deserialize(job.get("pig.inputs"));
+ ArrayList<EvalSpec> mapFuncs = (ArrayList<EvalSpec>) ObjectSerializer
+ .deserialize(job.get("pig.mapFuncs", ""));
+ ArrayList<EvalSpec> groupFuncs = (ArrayList<EvalSpec>) ObjectSerializer
+ .deserialize(job.get("pig.groupFuncs", ""));
+
+ PigContext pigContext = (PigContext) ObjectSerializer.deserialize(job
+ .get("pig.pigContext"));
+ // TODO: don't understand this code
// added for UNION: set group func arity to match arity of inputs
- if (groupFuncs!=null && groupFuncs.size() != inputs.size()) {
+ if (groupFuncs != null && groupFuncs.size() != inputs.size()) {
groupFuncs = new ArrayList<EvalSpec>();
for (int i = 0; i < groupFuncs.size(); i++) {
- groupFuncs.set(i,null);
+ groupFuncs.set(i, null);
}
}
-
+
if (inputs.size() != mapFuncs.size()) {
StringBuilder sb = new StringBuilder();
sb.append("number of inputs != number of map functions: ");
@@ -74,7 +75,6 @@
sb.append(job.get("pig.mapFuncs", "missing"));
throw new IOException(sb.toString());
}
-
if (groupFuncs!= null && inputs.size() != groupFuncs.size()) {
StringBuilder sb = new StringBuilder();
sb.append("number of inputs != number of group functions: ");
@@ -83,162 +83,52 @@
sb.append(groupFuncs.size());
throw new IOException(sb.toString());
}
-
- ArrayList<InputSplit> splits = new ArrayList<InputSplit>();
+
+ FileSystem fs = FileSystem.get(job);
+ List<SliceWrapper> splits = new ArrayList<SliceWrapper>();
for (int i = 0; i < inputs.size(); i++) {
- Path path = new Path(inputs.get(i).getFileName());
- String parser = inputs.get(i).getFuncSpec();
- FileSystem fs = path.getFileSystem(job);
-
- fs.setWorkingDirectory(new Path("/user", job.getUser()));
- ArrayList<Path> paths = new ArrayList<Path>();
- // If you give a non-glob name, globPaths returns a single
- // element with just that name.
- Path[] globPaths = fs.globPaths(path);
- for (int m = 0; m < globPaths.length; m++) paths.add(globPaths[m]);
- //paths.add(path);
- for (int j = 0; j < paths.size(); j++) {
- Path fullPath = new Path(fs.getWorkingDirectory(), paths.get(j));
- if (fs.getFileStatus(fullPath).isDir()) {
- FileStatus children[] = fs.listStatus(fullPath);
- for(int k = 0; k < children.length; k++) {
- paths.add(children[k].getPath());
- }
- continue;
- }
- long bs = fs.getFileStatus(fullPath).getBlockSize();
- long size = fs.getFileStatus(fullPath).getLen();
- long pos = 0;
- String name = paths.get(j).getName();
- if (name.endsWith(".gz") || !isSplitable) {
- // Anything that ends with a ".gz" or can't be split
- // we must process as a complete file
- splits.add(new PigSplit(pigContext, fs, fullPath, parser, groupFuncs==null ? null : groupFuncs.get(i), mapFuncs.get(i), i, 0, size));
- } else {
- while (pos < size) {
- if (pos + bs > size)
- bs = size - pos;
- splits.add(new PigSplit(pigContext, fs, fullPath, parser, groupFuncs==null ? null : groupFuncs.get(i), mapFuncs.get(i), i, pos, bs));
- pos += bs;
- }
- }
+ DataStorage store = new HDataStorage(job);
+ ValidatingInputFileSpec spec;
+ if (inputs.get(i) instanceof ValidatingInputFileSpec) {
+ spec = (ValidatingInputFileSpec) inputs.get(i);
+ } else {
+ spec = new ValidatingInputFileSpec(inputs.get(i), store);
+ }
+ EvalSpec groupBy = groupFuncs == null ? null : groupFuncs.get(i);
+ if (isSplittable && (spec.getSlicer() instanceof PigSlicer)) {
+ ((PigSlicer)spec.getSlicer()).setSplittable(isSplittable);
+ }
+ Slice[] pigs = spec.getSlicer().slice(store, spec.getFileName());
+ for (Slice split : pigs) {
+ splits.add(new SliceWrapper(split, pigContext, groupBy,
+ mapFuncs.get(i), i, fs));
}
}
- return splits.toArray(new PigSplit[splits.size()]);
+ return splits.toArray(new SliceWrapper[splits.size()]);
}
- public RecordReader<Text, Tuple> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
- PigRecordReader r = new PigRecordReader(job, (PigSplit)split, compressionCodecs);
- return r;
+ public RecordReader<Text, Tuple> getRecordReader(InputSplit split,
+ JobConf job, Reporter reporter) throws IOException {
+ activeSplit.set((SliceWrapper) split);
+ return ((SliceWrapper) split).makeReader(job);
}
- private CompressionCodecFactory compressionCodecs = null;
-
- static public String codecList;
public void configure(JobConf conf) {
- compressionCodecs = new CompressionCodecFactory(conf);
- codecList = conf.get("io.compression.codecs", "none");
}
-
- public static class PigRecordReader implements RecordReader<Text, Tuple> {
- /**
- * This is a tremendously ugly hack to get around the fact that mappers do not have access
- * to their readers. We take advantage of the fact that RecordReader.next and Mapper.map is
- * run on same the thread to share information through a thread local variable.
- */
- static ThreadLocal<PigRecordReader> myReader = new ThreadLocal<PigRecordReader>();
-
- public static PigRecordReader getPigRecordReader() {
- return myReader.get();
- }
-
- private InputStream is;
- private FSDataInputStream fsis;
- private long end;
- private PigSplit split;
- LoadFunc loader;
- CompressionCodecFactory compressionFactory;
- JobConf job;
-
- PigRecordReader(JobConf job, PigSplit split, CompressionCodecFactory compressionFactory) throws IOException {
- this.split = split;
- this.job = job;
- this.compressionFactory = compressionFactory;
- loader = split.getLoadFunction();
- Path path = split.getPath();
- FileSystem fs = path.getFileSystem(job);
- fs.setWorkingDirectory(new Path("/user/" + job.getUser()));
- CompressionCodec codec = compressionFactory.getCodec(split.getPath());
- long start = split.getStart();
- fsis = fs.open(split.getPath());
- fsis.seek(start);
-
- if (codec != null) {
- is = codec.createInputStream(fsis);
- end = Long.MAX_VALUE;
-
- } else{
- end = start + split.getLength();
-
- if (split.file.getName().endsWith(".bz") ||
- split.file.getName().endsWith(".bz2")) {
- is = new CBZip2InputStream(fsis,9);
- }else{
- is = fsis;
- }
- }
- myReader.set(this);
- loader.bindTo(split.getPath().toString(), new BufferedPositionedInputStream(is, start), start, end);
-
- // Mimic FileSplit
- job.set("map.input.file", split.getPath().toString());
- job.setLong("map.input.start", split.getStart());
- job.setLong("map.input.length", split.getLength());
- }
-
- public JobConf getJobConf(){
- return job;
- }
-
- public boolean next(Text key, Tuple value) throws IOException {
- Tuple t = loader.getNext();
- if (t == null) {
- return false;
- }
- key.set(split.getPath().getName());
- value.copyFrom(t);
- return true;
- }
-
- public long getPos() throws IOException {
- return fsis.getPos();
- }
-
- public void close() throws IOException {
- is.close();
- }
-
- public PigSplit getPigFileSplit() {
- return split;
- }
-
- public Text createKey() {
- return new Text();
- }
-
- public Tuple createValue() {
- return new Tuple();
- }
-
- public float getProgress() throws IOException {
- float progress = getPos() - split.getStart();
- float finish = split.getLength();
- return progress/finish;
- }
+ public static SliceWrapper getActiveSplit() {
+ return activeSplit.get();
}
+ /**
+ * This is a tremendously ugly hack to get around the fact that mappers do
+ * not have access to their readers. We take advantage of the fact that
+ * RecordReader.next and Mapper.map is run on same the thread to share
+ * information through a thread local variable.
+ */
+ private static ThreadLocal<SliceWrapper> activeSplit = new ThreadLocal<SliceWrapper>();
+
public void validateInput(JobConf arg0) throws IOException {
}
- }
+}
Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java?rev=644033&r1=644032&r2=644033&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java Wed Apr 2 13:11:06 2008
@@ -17,7 +17,6 @@
*/
package org.apache.pig.backend.hadoop.executionengine.mapreduceExec;
-import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
@@ -52,7 +51,6 @@
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.logicalLayer.LOCogroup;
import org.apache.pig.impl.util.ObjectSerializer;
-import org.apache.pig.tools.timer.PerformanceTimerFactory;
/**
@@ -220,7 +218,7 @@
private void setupMapPipe(Properties properties, Reporter reporter)
throws IOException {
- PigSplit split = PigInputFormat.PigRecordReader.getPigRecordReader().getPigFileSplit();
+ SliceWrapper split = PigInputFormat.getActiveSplit();
index = split.getIndex();
EvalSpec evalSpec = split.getEvalSpec();
Added: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/SliceWrapper.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/SliceWrapper.java?rev=644033&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/SliceWrapper.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/SliceWrapper.java Wed Apr 2 13:11:06 2008
@@ -0,0 +1,174 @@
+package org.apache.pig.backend.hadoop.executionengine.mapreduceExec;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.pig.Slice;
+import org.apache.pig.backend.datastorage.DataStorage;
+import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.eval.EvalSpec;
+
+/**
+ * Wraps a {@link Slice} in an {@link InputSplit} so it's usable by hadoop.
+ */
+public class SliceWrapper implements InputSplit {
+
+ private EvalSpec groupbySpec;
+ private EvalSpec evalSpec;
+ private int index;
+ private PigContext pigContext;
+ private Slice wrapped;
+ private transient FileSystem fs;// transient so it isn't serialized
+ private transient JobConf lastConf;
+
+ public SliceWrapper() {
+ // for deserialization
+ }
+
+ public SliceWrapper(Slice slice, PigContext context, EvalSpec groupbySpec,
+ EvalSpec evalSpec, int index, FileSystem fs) {
+ this.wrapped = slice;
+ this.pigContext = context;
+ this.groupbySpec = groupbySpec;
+ this.evalSpec = evalSpec;
+ this.index = index;
+ this.fs = fs;
+ }
+
+ public EvalSpec getEvalSpec() {
+ return evalSpec;
+ }
+
+ public EvalSpec getGroupbySpec() {
+ return groupbySpec;
+ }
+
+ public int getIndex() {
+ return index;
+ }
+
+ public long getLength() throws IOException {
+ return wrapped.getLength();
+ }
+
+ public String[] getLocations() throws IOException {
+ Set<String> locations = new HashSet<String>();
+ for (String loc : wrapped.getLocations()) {
+ Path path = new Path(loc);
+ String hints[][] = fs.getFileCacheHints(path, 0, fs.getFileStatus(
+ path).getLen());
+ for (int i = 0; i < hints.length; i++) {
+ for (int j = 0; j < hints[i].length; j++) {
+ locations.add(hints[i][j]);
+ }
+ }
+ }
+ return locations.toArray(new String[locations.size()]);
+ }
+
+ public JobConf getJobConf() {
+ return lastConf;
+ }
+
+ public RecordReader<Text, Tuple> makeReader(JobConf job) throws IOException {
+ lastConf = job;
+ DataStorage store = new HDataStorage(job);
+ store.setActiveContainer(store.asContainer("/user/" + job.getUser()));
+ wrapped.init(store);
+ return new RecordReader<Text, Tuple>() {
+
+ public void close() throws IOException {
+ wrapped.close();
+ }
+
+ public Text createKey() {
+ return new Text();
+ }
+
+ public Tuple createValue() {
+ return new Tuple();
+ }
+
+ public long getPos() throws IOException {
+ return wrapped.getPos();
+ }
+
+ public float getProgress() throws IOException {
+ return wrapped.getProgress();
+ }
+
+ public boolean next(Text key, Tuple value) throws IOException {
+ return wrapped.next(value);
+ }
+ };
+ }
+
+ public void readFields(DataInput is) throws IOException {
+ pigContext = (PigContext) readObject(is);
+
+ groupbySpec = (EvalSpec) readObject(is);
+ if (groupbySpec != null) {
+ groupbySpec.instantiateFunc(pigContext);
+ }
+ evalSpec = (EvalSpec) readObject(is);
+ if (evalSpec != null) {
+ evalSpec.instantiateFunc(pigContext);
+ }
+ index = is.readInt();
+ wrapped = (Slice) readObject(is);
+ }
+
+ private IOException wrapException(Exception e) {
+ IOException newE = new IOException(e.getMessage());
+ newE.initCause(e);
+ return newE;
+ }
+
+ private Object readObject(DataInput is) throws IOException {
+ byte[] bytes = new byte[is.readInt()];
+ is.readFully(bytes);
+ ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(
+ bytes));
+ try {
+ return ois.readObject();
+ } catch (ClassNotFoundException cnfe) {
+ IOException newE = wrapException(cnfe);
+ throw newE;
+ }
+ }
+
+ public void write(DataOutput os) throws IOException {
+ writeObject(pigContext, os);
+ writeObject(groupbySpec, os);
+ writeObject(evalSpec, os);
+ os.writeInt(index);
+ writeObject(wrapped, os);
+ }
+
+ private void writeObject(Serializable obj, DataOutput os)
+ throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos);
+ oos.writeObject(obj);
+ byte[] bytes = baos.toByteArray();
+ os.writeInt(bytes.length);
+ os.write(bytes);
+ }
+
+}
Modified: incubator/pig/trunk/src/org/apache/pig/backend/local/datastorage/LocalDataStorage.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/local/datastorage/LocalDataStorage.java?rev=644033&r1=644032&r2=644033&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/local/datastorage/LocalDataStorage.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/local/datastorage/LocalDataStorage.java Wed Apr 2 13:11:06 2008
@@ -26,7 +26,7 @@
public Properties getConfiguration() {
Properties config = new Properties();
- config.put(DEFAULT_REPLICATION_FACTOR_KEY, (new Integer(1)).toString());
+ config.put(DEFAULT_REPLICATION_FACTOR_KEY, "" + 1);
return config;
}
Modified: incubator/pig/trunk/src/org/apache/pig/backend/local/datastorage/LocalPath.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/local/datastorage/LocalPath.java?rev=644033&r1=644032&r2=644033&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/local/datastorage/LocalPath.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/local/datastorage/LocalPath.java Wed Apr 2 13:11:06 2008
@@ -123,10 +123,12 @@
Map<String, Object> stats = new HashMap<String, Object>();
long size = this.path.length();
- stats.put(LENGTH_KEY , (new Long(size)).toString());
+ stats.put(LENGTH_KEY, size);
+
+ stats.put(BLOCK_REPLICATION_KEY, (short) 1);
long lastModified = this.path.lastModified();
- stats.put(MODIFICATION_TIME_KEY, (new Long(lastModified)).toString());
+ stats.put(MODIFICATION_TIME_KEY, lastModified);
return stats;
}
Modified: incubator/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java?rev=644033&r1=644032&r2=644033&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java Wed Apr 2 13:11:06 2008
@@ -17,7 +17,6 @@
*/
package org.apache.pig.impl.io;
-import java.lang.IllegalArgumentException;
import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
@@ -26,26 +25,24 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.util.Random;
-import java.util.Stack;
import java.util.ArrayList;
import java.util.Iterator;
+import java.util.Random;
+import java.util.Stack;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapred.JobConf;
import org.apache.pig.PigServer.ExecType;
+import org.apache.pig.backend.datastorage.ContainerDescriptor;
import org.apache.pig.backend.datastorage.DataStorage;
+import org.apache.pig.backend.datastorage.DataStorageException;
import org.apache.pig.backend.datastorage.ElementDescriptor;
-import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.util.WrappedIOException;
-
-import org.apache.pig.backend.datastorage.*;
import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
import org.apache.pig.backend.hadoop.executionengine.mapreduceExec.PigInputFormat;
-import org.apache.pig.backend.hadoop.executionengine.mapreduceExec.PigInputFormat.PigRecordReader;
-
-import java.util.Properties;
+import org.apache.pig.backend.hadoop.executionengine.mapreduceExec.SliceWrapper;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.util.WrappedIOException;
public class FileLocalizer {
private static final Log log = LogFactory.getLog(FileLocalizer.class);
@@ -141,19 +138,17 @@
}
/**
- * This function is meant to be used if the mappers/reducers want to access any HDFS file
- * @param fileName
- * @return
- * @throws IOException
+ * This function is meant to be used if the mappers/reducers want to access
+ * any HDFS file
*/
-
- public static InputStream openDFSFile(String fileName) throws IOException{
- PigRecordReader prr = PigInputFormat.PigRecordReader.getPigRecordReader();
-
- if (prr == null)
- throw new RuntimeException("can't open DFS file while executing locally");
-
- return openDFSFile(fileName, prr.getJobConf());
+ public static InputStream openDFSFile(String fileName) throws IOException {
+ SliceWrapper wrapper = PigInputFormat.getActiveSplit();
+
+ if (wrapper == null)
+ throw new RuntimeException(
+ "can't open DFS file while executing locally");
+
+ return openDFSFile(fileName, wrapper.getJobConf());
}
@@ -335,21 +330,16 @@
}
}
- public static boolean fileExists(String filename, PigContext pigContext) throws IOException {
- try
- {
- ElementDescriptor elem = pigContext.getDfs().asElement(filename);
+ public static boolean fileExists(String filename, PigContext context)
+ throws IOException {
+ return fileExists(filename, context.getDfs());
+ }
- if (elem.exists()) {
- return true;
- }
- else {
- return globMatchesFiles(elem, pigContext.getDfs());
- }
- }
- catch (DataStorageException e) {
- return false;
- }
+ public static boolean fileExists(String filename, DataStorage store)
+ throws IOException {
+ ElementDescriptor elem = store.asElement(filename);
+
+ return elem.exists() || globMatchesFiles(elem, store);
}
private static boolean globMatchesFiles(ElementDescriptor elem,
Added: incubator/pig/trunk/src/org/apache/pig/impl/io/ValidatingInputFileSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/io/ValidatingInputFileSpec.java?rev=644033&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/io/ValidatingInputFileSpec.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/impl/io/ValidatingInputFileSpec.java Wed Apr 2 13:11:06 2008
@@ -0,0 +1,59 @@
+package org.apache.pig.impl.io;
+
+import java.io.IOException;
+
+import org.apache.pig.Slicer;
+import org.apache.pig.PigServer.ExecType;
+import org.apache.pig.backend.datastorage.DataStorage;
+import org.apache.pig.backend.executionengine.PigSlicer;
+import org.apache.pig.impl.PigContext;
+
+/**
+ * Creates a Slicer using its funcSpec in its construction and checks that it's
+ * valid.
+ */
+public class ValidatingInputFileSpec extends FileSpec {
+
+ // Don't send the instantiated slicer over the wire.
+ private transient Slicer slicer;
+
+ private static final long serialVersionUID = 1L;
+
+ public ValidatingInputFileSpec(FileSpec fileSpec, DataStorage store)
+ throws IOException {
+ super(fileSpec.getFileName(), fileSpec.getFuncSpec());
+ validate(store);
+ }
+
+ /**
+ * If the <code>ExecType</code> of <code>context</code> is LOCAL,
+ * validation is not performed.
+ */
+ public ValidatingInputFileSpec(String fileName, String funcSpec,
+ PigContext context) throws IOException {
+
+ super(fileName, funcSpec);
+ if (context.getExecType() != ExecType.LOCAL) {
+ validate(context.getDfs());
+ }
+ }
+
+ private void validate(DataStorage store) throws IOException {
+ getSlicer().validate(store, getFileName());
+ }
+
+ /**
+ * Returns the Slicer created by this spec's funcSpec.
+ */
+ public Slicer getSlicer() {
+ if (slicer == null) {
+ Object loader = PigContext.instantiateFuncFromSpec(getFuncSpec());
+ if (loader instanceof Slicer) {
+ slicer = (Slicer) loader;
+ } else {
+ slicer = new PigSlicer(getFuncSpec());
+ }
+ }
+ return slicer;
+ }
+}
Modified: incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java?rev=644033&r1=644032&r2=644033&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java Wed Apr 2 13:11:06 2008
@@ -23,7 +23,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.pig.LoadFunc;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.logicalLayer.parser.ParseException;
@@ -40,20 +39,19 @@
protected int outputType = FIXED;
- protected boolean splitable = true;
+ protected boolean splittable = true;
public LOLoad(Map<OperatorKey, LogicalOperator> opTable,
String scope,
long id,
- FileSpec inputFileSpec, boolean splitable)
+ FileSpec inputFileSpec, boolean splittable)
throws IOException, ParseException {
super(opTable, scope, id);
this.inputFileSpec = inputFileSpec;
- this.splitable = splitable;
+ this.splittable = splittable;
// check if we can instantiate load func
- LoadFunc storageFunc = (LoadFunc) PigContext
- .instantiateFuncFromSpec(inputFileSpec.getFuncSpec());
+ PigContext.instantiateFuncFromSpec(inputFileSpec.getFuncSpec());
// TODO: Handle Schemas defined by Load Functions
schema = new TupleSchema();
@@ -116,8 +114,8 @@
return funcs;
}
- public boolean isSplitable() {
- return splitable;
+ public boolean isSplittable() {
+ return splittable;
}
public void visit(LOVisitor v) {
Modified: incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/LoadOptimizer.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/LoadOptimizer.java?rev=644033&r1=644032&r2=644033&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/LoadOptimizer.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/LoadOptimizer.java Wed Apr 2 13:11:06 2008
@@ -64,7 +64,7 @@
if (parentLoad) {
EvalSpec spec = e.getSpec();
- if (spec instanceof StreamSpec && !load.isSplitable()) {
+ if (spec instanceof StreamSpec && !load.isSplittable()) {
// Try and optimize if the load and stream input specs match
// and input files are to be processed as-is
StreamSpec streamSpec = (StreamSpec)spec;
Modified: incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=644033&r1=644032&r2=644033&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Wed Apr 2 13:11:06 2008
@@ -124,20 +124,12 @@
- String massageFilename(String filename, PigContext pigContext, boolean checkExists)
+ String massageFilename(String filename, PigContext pigContext)
throws IOException, ParseException {
if (pigContext.getExecType() != ExecType.LOCAL) {
if (filename.startsWith(FileLocalizer.LOCAL_PREFIX)) {
filename = FileLocalizer.hadoopify(filename, pigContext);
}
- else
- {
- // make sure that dfs file exists
- if (checkExists && !FileLocalizer.fileExists(filename, pigContext))
- {
- throw new ParseException(FileLocalizer.fullPath(filename, pigContext) + " does not exist");
- }
- }
}
return filename;
}
@@ -689,7 +681,13 @@
funcSpec += continuous ? "('\t','\n','0')" : "()";
}
- lo = new LOLoad(opTable, scope, getNextId(), new FileSpec(massageFilename(filename, pigContext, true), funcSpec), splitable);
+ try {
+ lo = new LOLoad(opTable, scope, getNextId(),
+ new ValidatingInputFileSpec(massageFilename(filename, pigContext),
+ funcSpec, pigContext), splitable);
+ } catch(IOException e) {
+ throw new ParseException(e.getMessage());
+ }
if (continuous)
lo.setOutputType(LogicalOperator.MONOTONE);
return lo;
@@ -1507,7 +1505,7 @@
}
LogicalPlan readFrom = aliases.get(t.image);
- String jobOutputFile = massageFilename(fileName, pigContext, false);
+ String jobOutputFile = massageFilename(fileName, pigContext);
lo = new LOStore(opTable, scope, getNextId(), readFrom.getRoot(),
new FileSpec(jobOutputFile, functionSpec),
false);
@@ -1582,4 +1580,4 @@
{
command.setLogDir(alias);
}
-}
\ No newline at end of file
+}
Modified: incubator/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java?rev=644033&r1=644032&r2=644033&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java Wed Apr 2 13:11:06 2008
@@ -13,7 +13,6 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.fs.permission.AccessControlException;
import org.apache.pig.PigServer;
import org.apache.pig.backend.datastorage.ContainerDescriptor;
import org.apache.pig.backend.datastorage.DataStorage;
@@ -306,43 +305,28 @@
if (mDfs.isContainer(curElem.toString())) {
System.out.println(curElem.toString() + "\t<dir>");
- }
- else {
- Properties config = curElem.getConfiguration();
- Map<String, Object> stats = curElem.getStatistics();
-
- String strReplication = config.getProperty(ElementDescriptor.BLOCK_REPLICATION_KEY);
- String strLen = (String) stats.get(ElementDescriptor.LENGTH_KEY);
-
- StringBuilder sb = new StringBuilder();
- sb.append(curElem.toString());
- sb.append("<r ");
- sb.append(strReplication);
- sb.append(">\t");
- sb.append(strLen);
- System.out.println(sb.toString());
+ } else {
+ printLengthAndReplication(curElem);
}
}
- }
- else {
- Properties config = pathDescriptor.getConfiguration();
- Map<String, Object> stats = pathDescriptor.getStatistics();
-
- String strReplication = (String) config.get(ElementDescriptor.BLOCK_REPLICATION_KEY);
- String strLen = (String) stats.get(ElementDescriptor.LENGTH_KEY);
-
- StringBuilder sb = new StringBuilder();
- sb.append(pathDescriptor.toString());
- sb.append("<r ");
- sb.append(strReplication);
- sb.append(">\t");
- sb.append(strLen);
- System.out.println(sb.toString());
+ } else {
+ printLengthAndReplication(pathDescriptor);
}
}
catch (DataStorageException e) {
throw WrappedIOException.wrap("Failed to LS on " + path, e);
}
+ }
+
+ private void printLengthAndReplication(ElementDescriptor elem)
+ throws IOException {
+ Map<String, Object> stats = elem.getStatistics();
+
+ long replication = (Short) stats
+ .get(ElementDescriptor.BLOCK_REPLICATION_KEY);
+ long len = (Long) stats.get(ElementDescriptor.LENGTH_KEY);
+
+ System.out.println(elem.toString() + "<r " + replication + ">\t" + len);
}
protected void processPWD() throws IOException
Added: incubator/pig/trunk/test/org/apache/pig/test/RangeSlicer.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/RangeSlicer.java?rev=644033&view=auto
==============================================================================
--- incubator/pig/trunk/test/org/apache/pig/test/RangeSlicer.java (added)
+++ incubator/pig/trunk/test/org/apache/pig/test/RangeSlicer.java Wed Apr 2 13:11:06 2008
@@ -0,0 +1,100 @@
+package org.apache.pig.test;
+
+import java.io.IOException;
+
+import org.apache.pig.Slice;
+import org.apache.pig.Slicer;
+import org.apache.pig.backend.datastorage.DataStorage;
+import org.apache.pig.data.DataAtom;
+import org.apache.pig.data.Tuple;
+
+/**
+ * Makes slices each containing a single value from 0 to value - 1.
+ */
+public class RangeSlicer
+ implements Slicer
+{
+
+ /**
+ * Expects location to be a Stringified integer, and makes
+ * Integer.parseInt(location) slices. Each slice generates a single value,
+ * its index in the sequence of slices.
+ */
+ public Slice[] slice (DataStorage store, String location)
+ throws IOException
+ {
+ int numslices = Integer.parseInt(location);
+ Slice[] slices = new Slice[numslices];
+ for (int i = 0; i < slices.length; i++) {
+ slices[i] = new SingleValueSlice(i);
+ }
+ return slices;
+ }
+
+ public void validate(DataStorage store, String location) throws IOException {
+ try {
+ Integer.parseInt(location);
+ } catch (NumberFormatException nfe) {
+ throw new IOException(nfe.getMessage());
+ }
+ }
+
+ /**
+ * A Slice that returns a single value from next.
+ */
+ public static class SingleValueSlice
+ implements Slice
+ {
+ public int val;
+
+ private transient boolean read;
+
+ public SingleValueSlice (int value)
+ {
+ this.val = value;
+ }
+
+ public void close ()
+ throws IOException
+ {}
+
+ public long getLength ()
+ {
+ return 1;
+ }
+
+ public String[] getLocations ()
+ {
+ return new String[0];
+ }
+
+ public long getPos ()
+ throws IOException
+ {
+ return read ? 1 : 0;
+ }
+
+ public float getProgress ()
+ throws IOException
+ {
+ return read ? 1 : 0;
+ }
+
+ public void init (DataStorage store)
+ throws IOException
+ {}
+
+ public boolean next (Tuple value)
+ throws IOException
+ {
+ if (!read) {
+ value.appendField(new DataAtom(val));
+ read = true;
+ return true;
+ }
+ return false;
+ }
+
+ private static final long serialVersionUID = 1L;
+ }
+}
Added: incubator/pig/trunk/test/org/apache/pig/test/TestCustomSlicer.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/TestCustomSlicer.java?rev=644033&view=auto
==============================================================================
--- incubator/pig/trunk/test/org/apache/pig/test/TestCustomSlicer.java (added)
+++ incubator/pig/trunk/test/org/apache/pig/test/TestCustomSlicer.java Wed Apr 2 13:11:06 2008
@@ -0,0 +1,34 @@
+package org.apache.pig.test;
+
+import static org.apache.pig.PigServer.ExecType.MAPREDUCE;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataAtom;
+import org.apache.pig.data.Tuple;
+import org.junit.Test;
+import junit.framework.TestCase;
+
+public class TestCustomSlicer extends TestCase{
+ /**
+ * Uses RangeSlicer in place of pig's default Slicer to generate a few
+ * values and count them.
+ */
+ @Test
+ public void testUseRangeSlicer() throws ExecException, IOException {
+ PigServer pig = new PigServer(MAPREDUCE);
+ int numvals = 50;
+ String query = "vals = foreach (group (load '"
+ + numvals
+ + "'using org.apache.pig.test.RangeSlicer()) all) generate COUNT($1);";
+ pig.registerQuery(query);
+ Iterator<Tuple> it = pig.openIterator("vals");
+ Tuple cur = it.next();
+ DataAtom val = cur.getAtomField(0);
+ assertEquals(numvals, (int) val.longVal());
+ }
+}
Added: incubator/pig/trunk/test/org/apache/pig/test/TestParser.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/TestParser.java?rev=644033&view=auto
==============================================================================
--- incubator/pig/trunk/test/org/apache/pig/test/TestParser.java (added)
+++ incubator/pig/trunk/test/org/apache/pig/test/TestParser.java Wed Apr 2 13:11:06 2008
@@ -0,0 +1,22 @@
+package org.apache.pig.test;
+
+import static org.apache.pig.PigServer.ExecType.MAPREDUCE;
+
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+
+public class TestParser extends TestCase {
+
+ public void testLoadingNonexistentFile() throws ExecException, IOException {
+ PigServer pig = new PigServer(MAPREDUCE);
+ try {
+ pig.registerQuery("vals = load 'nonexistentfile';");
+ fail("Loading a nonexistent file should throw an IOException at parse time");
+ } catch (IOException io) {
+ }
+ }
+}