You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2009/10/27 02:13:22 UTC
svn commit: r830041 [3/3] - in /hadoop/pig/branches/load-store-redesign:
src/org/apache/pig/ src/org/apache/pig/backend/executionengine/
src/org/apache/pig/backend/hadoop/datastorage/
src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pi...
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/SampleLoader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/SampleLoader.java?rev=830041&r1=830040&r2=830041&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/SampleLoader.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/SampleLoader.java Tue Oct 27 01:13:19 2009
@@ -21,6 +21,7 @@
import java.util.ArrayList;
import java.util.Map;
+import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.pig.ExecType;
import org.apache.pig.LoadFunc;
import org.apache.pig.SamplableLoader;
@@ -39,16 +40,18 @@
* Abstract class that specifies the interface for sample loaders
*
*/
+//XXX : FIXME - make this work with new load-store redesign
public abstract class SampleLoader implements LoadFunc {
protected int numSamples;
protected long skipInterval;
- protected SamplableLoader loader;
+ protected LoadFunc loader;
private TupleFactory factory;
+ private boolean initialized = false;
public SampleLoader(String funcSpec) {
- loader = (SamplableLoader)PigContext.instantiateFuncFromSpec(funcSpec);
+ loader = (LoadFunc)PigContext.instantiateFuncFromSpec(funcSpec);
}
public void setNumSamples(int n) {
@@ -59,133 +62,21 @@
return numSamples;
}
- /* (non-Javadoc)
- * @see org.apache.pig.LoadFunc#bindTo(java.lang.String, org.apache.pig.impl.io.BufferedPositionedInputStream, long, long)
- */
- public void bindTo(String fileName, BufferedPositionedInputStream is,
- long offset, long end) throws IOException {
- skipInterval = (end - offset)/numSamples;
- loader.bindTo(fileName, is, offset, end);
- }
-
- /* (non-Javadoc)
- * @see org.apache.pig.LoadFunc#bytesToBag(byte[])
- */
- public DataBag bytesToBag(byte[] b) throws IOException {
- return loader.bytesToBag(b);
- }
-
- /* (non-Javadoc)
- * @see org.apache.pig.LoadFunc#bytesToCharArray(byte[])
- */
- public String bytesToCharArray(byte[] b) throws IOException {
- return loader.bytesToCharArray(b);
- }
-
- /* (non-Javadoc)
- * @see org.apache.pig.LoadFunc#bytesToDouble(byte[])
- */
- public Double bytesToDouble(byte[] b) throws IOException {
- return loader.bytesToDouble(b);
- }
-
- /* (non-Javadoc)
- * @see org.apache.pig.LoadFunc#bytesToFloat(byte[])
- */
- public Float bytesToFloat(byte[] b) throws IOException {
- return loader.bytesToFloat(b);
- }
-
- /* (non-Javadoc)
- * @see org.apache.pig.LoadFunc#bytesToInteger(byte[])
- */
- public Integer bytesToInteger(byte[] b) throws IOException {
- return loader.bytesToInteger(b);
- }
-
- /* (non-Javadoc)
- * @see org.apache.pig.LoadFunc#bytesToLong(byte[])
- */
- public Long bytesToLong(byte[] b) throws IOException {
- return loader.bytesToLong(b);
- }
-
- /* (non-Javadoc)
- * @see org.apache.pig.LoadFunc#bytesToMap(byte[])
- */
- public Map<String, Object> bytesToMap(byte[] b) throws IOException {
- return loader.bytesToMap(b);
- }
-
- /* (non-Javadoc)
- * @see org.apache.pig.LoadFunc#bytesToTuple(byte[])
- */
- public Tuple bytesToTuple(byte[] b) throws IOException {
- return loader.bytesToTuple(b);
- }
-
- /* (non-Javadoc)
- * @see org.apache.pig.LoadFunc#determineSchema(java.lang.String, org.apache.pig.ExecType, org.apache.pig.backend.datastorage.DataStorage)
- */
- public Schema determineSchema(String fileName, ExecType execType,
- DataStorage storage) throws IOException {
- return loader.determineSchema(fileName, execType, storage);
- }
-
- /* (non-Javadoc)
- * @see org.apache.pig.LoadFunc#fieldsToRead(org.apache.pig.impl.logicalLayer.schema.Schema)
- */
- public void fieldsToRead(Schema schema) {
- loader.fieldsToRead(schema);
- }
+ /* (non-Javadoc)
+ * @see org.apache.pig.LoadFunc#getInputFormat()
+ */
+ @Override
+ public InputFormat getInputFormat() {
+ return loader.getInputFormat();
+ }
- /* (non-Javadoc)
+ /* (non-Javadoc)
* @see org.apache.pig.LoadFunc#getNext()
*/
public Tuple getNext() throws IOException {
- long initialPos = loader.getPosition();
-
- // we move to next boundry
- Tuple t = loader.getSampledTuple();
- long finalPos = loader.getPosition();
-
- long toSkip = skipInterval - (finalPos - initialPos);
- if (toSkip > 0) {
- long rc = loader.skip(toSkip);
-
- // if we did not skip enough
- // in the first attempt, call
- // in.skip() repeatedly till we
- // skip enough
- long remainingSkip = toSkip - rc;
- while(remainingSkip > 0) {
- rc = loader.skip(remainingSkip);
- if(rc == 0) {
- // underlying stream saw EOF
- break;
- }
- remainingSkip -= rc;
- }
- }
-
- if (t == null) {
- return null;
- }
-
- if (factory == null) {
- factory = TupleFactory.getInstance();
- }
-
- // copy existing field
- Tuple m = factory.newTuple(t.size()+1);
- for(int i=0; i<t.size(); i++) {
- m.set(i, t.get(i));
- }
-
- // add size of the tuple at the end
- m.set(t.size(), (finalPos-initialPos) + 1); // offset 1 for null
-
- return m;
+ // estimate how many tuples there are in the map
+ // based on the
+ return null;
}
public void computeSamples(ArrayList<Pair<FileSpec, Boolean>> inputs, PigContext pc) throws ExecException {
Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/BinStorageInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/BinStorageInputFormat.java?rev=830041&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/BinStorageInputFormat.java (added)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/BinStorageInputFormat.java Tue Oct 27 01:13:19 2009
@@ -0,0 +1,30 @@
+/**
+ *
+ */
+package org.apache.pig.impl.io;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.pig.data.Tuple;
+
+/**
+ *
+ */
+public class BinStorageInputFormat extends FileInputFormat<Text, Tuple> {
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapreduce.InputFormat#createRecordReader(org.apache.hadoop.mapreduce.InputSplit, org.apache.hadoop.mapreduce.TaskAttemptContext)
+ */
+ @Override
+ public RecordReader<Text, Tuple> createRecordReader(InputSplit split,
+ TaskAttemptContext context) throws IOException,
+ InterruptedException {
+ return new BinStorageRecordReader();
+ }
+
+}
Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/BinStorageRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/BinStorageRecordReader.java?rev=830041&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/BinStorageRecordReader.java (added)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/BinStorageRecordReader.java Tue Oct 27 01:13:19 2009
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.io;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataReaderWriter;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+
+/**
+ * Treats keys as offset in file and value as line.
+ */
+public class BinStorageRecordReader extends RecordReader<Text, Tuple> {
+
+ private long start;
+ private long pos;
+ private long end;
+ private BufferedPositionedInputStream in;
+ private Tuple value = null;
+ public static final int RECORD_1 = 0x01;
+ public static final int RECORD_2 = 0x02;
+ public static final int RECORD_3 = 0x03;
+ private DataInputStream inData = null;
+
+ public void initialize(InputSplit genericSplit,
+ TaskAttemptContext context) throws IOException {
+ FileSplit split = (FileSplit) genericSplit;
+ Configuration job = context.getConfiguration();
+ start = split.getStart();
+ end = start + split.getLength();
+ final Path file = split.getPath();
+
+ // open the file and seek to the start of the split
+ FileSystem fs = file.getFileSystem(job);
+ FSDataInputStream fileIn = fs.open(split.getPath());
+ if (start != 0) {
+ fileIn.seek(start);
+ }
+ in = new BufferedPositionedInputStream(fileIn, start);
+ inData = new DataInputStream(in);
+ }
+
+ public boolean nextKeyValue() throws IOException {
+ int b = 0;
+ // skip to next record
+ while (true) {
+ if (in == null || in.getPosition() >=end) {
+ return false;
+ }
+ // check if we saw RECORD_1 in our last attempt
+ // this can happen if we have the following
+ // sequence RECORD_1-RECORD_1-RECORD_2-RECORD_3
+ // After reading the second RECORD_1 in the above
+ // sequence, we should not look for RECORD_1 again
+ if(b != RECORD_1) {
+ b = in.read();
+ if(b != RECORD_1 && b != -1) {
+ continue;
+ }
+ if(b == -1) return false;
+ }
+ b = in.read();
+ if(b != RECORD_2 && b != -1) {
+ continue;
+ }
+ if(b == -1) return false;
+ b = in.read();
+ if(b != RECORD_3 && b != -1) {
+ continue;
+ }
+ if(b == -1) return false;
+ b = in.read();
+ if(b != DataType.TUPLE && b != -1) {
+ continue;
+ }
+ if(b == -1) return false;
+ break;
+ }
+ try {
+ // if we got here, we have seen RECORD_1-RECORD_2-RECORD_3-TUPLE_MARKER
+ // sequence - lets now read the contents of the tuple
+ value = (Tuple)DataReaderWriter.readDatum(inData, DataType.TUPLE);
+ return true;
+ } catch (ExecException ee) {
+ throw ee;
+ }
+
+ }
+
+ @Override
+ public Text getCurrentKey() {
+ // the key is always null since we don't really have a key for each
+ // input record
+ return null;
+ }
+
+ @Override
+ public Tuple getCurrentValue() {
+ return value;
+ }
+
+ /**
+ * Get the progress within the split
+ */
+ public float getProgress() {
+ if (start == end) {
+ return 0.0f;
+ } else {
+ return Math.min(1.0f, (pos - start) / (float)(end - start));
+ }
+ }
+
+ public synchronized void close() throws IOException {
+ if (in != null) {
+ in.close();
+ }
+ }
+}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/FileLocalizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/FileLocalizer.java?rev=830041&r1=830040&r2=830041&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/FileLocalizer.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/FileLocalizer.java Tue Oct 27 01:13:19 2009
@@ -152,15 +152,7 @@
* @throws IOException
*/
public static InputStream openDFSFile(String fileName) throws IOException {
- SliceWrapper wrapper = PigInputFormat.getActiveSplit();
-
- Configuration conf = null;
- if (wrapper == null) {
- conf = PigMapReduce.sJobConf;
- }else{
- conf = wrapper.getJobConf();
- }
-
+ Configuration conf = PigMapReduce.sJobConf;
if (conf == null) {
throw new RuntimeException(
"can't open DFS file while executing locally");
@@ -177,14 +169,7 @@
}
public static long getSize(String fileName) throws IOException {
- SliceWrapper wrapper = PigInputFormat.getActiveSplit();
-
- Configuration conf = null;
- if (wrapper == null) {
- conf = PigMapReduce.sJobConf;
- }else{
- conf = wrapper.getJobConf();
- }
+ Configuration conf = PigMapReduce.sJobConf;
if (conf == null) {
throw new RuntimeException(
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/PigFile.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/PigFile.java?rev=830041&r1=830040&r2=830041&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/PigFile.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/PigFile.java Tue Oct 27 01:13:19 2009
@@ -31,6 +31,8 @@
import org.apache.pig.impl.io.FileLocalizer;
+// XXX: FIXME: make this work with load store redesign
+
public class PigFile {
private String file = null;
boolean append = false;
@@ -47,7 +49,8 @@
public DataBag load(LoadFunc lfunc, PigContext pigContext) throws IOException {
DataBag content = BagFactory.getInstance().newDefaultBag();
InputStream is = FileLocalizer.open(file, pigContext);
- lfunc.bindTo(file, new BufferedPositionedInputStream(is), 0, Long.MAX_VALUE);
+ //XXX FIXME: make this work with new load-store redesign
+// lfunc.bindTo(file, new BufferedPositionedInputStream(is), 0, Long.MAX_VALUE);
Tuple f = null;
while ((f = lfunc.getNext()) != null) {
content.add(f);
@@ -58,12 +61,12 @@
public void store(DataBag data, StoreFunc sfunc, PigContext pigContext) throws IOException {
BufferedOutputStream bos = new BufferedOutputStream(FileLocalizer.create(file, append, pigContext));
- sfunc.bindTo(bos);
+// sfunc.bindTo(bos);
for (Iterator<Tuple> it = data.iterator(); it.hasNext();) {
Tuple row = it.next();
sfunc.putNext(row);
}
- sfunc.finish();
+// sfunc.finish();
bos.close();
}
Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/ReadToEndLoader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/ReadToEndLoader.java?rev=830041&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/ReadToEndLoader.java (added)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/ReadToEndLoader.java Tue Oct 27 01:13:19 2009
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.io;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.pig.LoadCaster;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.plan.OperatorKey;
+
+/**
+ * This is wrapper Loader which wraps a real LoadFunc underneath and allows
+ * to read a file completely starting a given split (indicated by a split index
+ * which is used to look in the List<InputSplit> returned by the underlying
+ * InputFormat's getSplits() method). So if the supplied split index is 0, this
+ * loader will read the entire file. If it is non zero it will read the partial
+ * file beginning from that split to the last split.
+ *
+ * The call sequence to use this is:
+ * 1) construct an object using the constructor
+ * 2) Call getNext() in a loop till it returns null
+ */
+public class ReadToEndLoader implements LoadFunc {
+
+ /**
+ * the wrapped LoadFunc which will do the actual reading
+ */
+ private LoadFunc wrappedLoadFunc;
+
+ /**
+ * the Configuration object used to locate the input location - this will
+ * be used to call {@link LoadFunc#setLocation(String, Configuration)} on
+ * the wrappedLoadFunc
+ */
+ private Configuration conf;
+
+ /**
+ * the input location string (typically input file/dir name )
+ */
+ private String inputLocation;
+
+ /**
+ * the index of the split (in {@link InputFormat#getSplits(org.apache.hadoop.mapreduce.JobContext)})
+ * to start reading from
+ */
+ private int startSplitIndex;
+
+ /**
+ * the index of the split the loader is currently reading from
+ */
+ private int curSplitIndex;
+
+ /**
+ * the input splits returned by underlying {@link InputFormat#getSplits(JobContext)}
+ */
+ private List<InputSplit> splits;
+
+ /**
+ * underlying RecordReader
+ */
+ private RecordReader reader;
+
+ /**
+ * underlying InputFormat
+ */
+ private InputFormat inputFormat;
+
+ /**
+ * @param wrappedLoadFunc
+ * @param conf
+ * @param inputLocation
+ * @param splitIndex
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public ReadToEndLoader(LoadFunc wrappedLoadFunc, Configuration conf,
+ String inputLocation, int splitIndex) throws IOException {
+ this.wrappedLoadFunc = wrappedLoadFunc;
+ // make a copy so that if the underlying InputFormat writes to the
+ // conf, we don't affect the caller's copy
+ this.conf = new Configuration(conf);
+ this.inputLocation = inputLocation;
+ this.startSplitIndex = splitIndex;
+ this.curSplitIndex = startSplitIndex;
+
+ // let's initialize the wrappedLoadFunc
+ Job job = new Job(this.conf);
+ wrappedLoadFunc.setLocation(this.inputLocation,
+ job);
+ // The above setLocation call could write to the conf within
+ // the job - get a hold of the modified conf
+ this.conf = job.getConfiguration();
+ inputFormat = wrappedLoadFunc.getInputFormat();
+ try {
+ splits = inputFormat.getSplits(new JobContext(this.conf,
+ new JobID()));
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private boolean initializeReader() throws IOException,
+ InterruptedException {
+ if(curSplitIndex > splits.size() - 1) {
+ // past the last split, we are done
+ return false;
+ }
+
+ InputSplit curSplit = splits.get(curSplitIndex);
+ TaskAttemptContext tAContext = new TaskAttemptContext(conf,
+ new TaskAttemptID());
+ reader = inputFormat.createRecordReader(curSplit, tAContext);
+ reader.initialize(curSplit, tAContext);
+ // create a dummy pigsplit - other than the actual split, the other
+ // params are really not needed here where we are just reading the
+ // input completely
+ PigSplit pigSplit = new PigSplit(curSplit, -1,
+ new ArrayList<OperatorKey>(), -1);
+ wrappedLoadFunc.prepareToRead(reader, pigSplit);
+ return true;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.LoadFunc#getNext()
+ */
+ public Tuple getNext() throws IOException {
+ try {
+ Tuple t = null;
+ if(reader == null) {
+ // first call
+ return getNextHelper();
+ } else {
+ // we already have a reader initialized
+ t = wrappedLoadFunc.getNext();
+ if(t != null) {
+ return t;
+ }
+ // if loadfunc returned null, we need to read next split
+ // if there is one
+ curSplitIndex++;
+ return getNextHelper();
+ }
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private Tuple getNextHelper() throws IOException, InterruptedException {
+ Tuple t = null;
+ while(initializeReader()) {
+ t = wrappedLoadFunc.getNext();
+ if(t == null) {
+ // try next split
+ curSplitIndex++;
+ } else {
+ return t;
+ }
+ }
+ // we processed all splits - we are done
+ wrappedLoadFunc.doneReading();
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.LoadFunc#doneReading()
+ */
+ @Override
+ public void doneReading() {
+ throw new RuntimeException("Internal Error: Unimplemented method called!");
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.LoadFunc#getInputFormat()
+ */
+ @Override
+ public InputFormat getInputFormat() {
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.LoadFunc#getLoadCaster()
+ */
+ @Override
+ public LoadCaster getLoadCaster() {
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.LoadFunc#prepareToRead(org.apache.hadoop.mapreduce.RecordReader, org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit)
+ */
+ @Override
+ public void prepareToRead(RecordReader reader, PigSplit split) {
+ throw new RuntimeException("Internal Error: Unimplemented method called!");
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.LoadFunc#setLocation(java.lang.String, org.apache.hadoop.mapreduce.Job)
+ */
+ @Override
+ public void setLocation(String location, Job job) throws IOException {
+ throw new RuntimeException("Internal Error: Unimplemented method called!");
+ }
+
+}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOLoad.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOLoad.java?rev=830041&r1=830040&r2=830041&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOLoad.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOLoad.java Tue Oct 27 01:13:19 2009
@@ -27,8 +27,11 @@
import java.util.Set;
import org.apache.pig.ExecType;
import org.apache.pig.LoadFunc;
+import org.apache.pig.LoadMetadata;
import org.apache.pig.PigException;
+import org.apache.pig.ResourceSchema;
import org.apache.pig.backend.datastorage.DataStorage;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.data.DataType;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileSpec;
@@ -38,12 +41,14 @@
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.MultiMap;
import org.apache.pig.impl.util.Pair;
+import org.apache.pig.impl.util.PropertiesUtil;
import org.apache.pig.impl.util.WrappedIOException;
import org.apache.pig.impl.logicalLayer.parser.ParseException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.logicalLayer.schema.SchemaMergeException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.Job;
public class LOLoad extends RelationalOperator {
private static final long serialVersionUID = 2L;
@@ -145,8 +150,8 @@
}
if(null == mDeterminedSchema) {
- mSchema = mLoadFunc.determineSchema(mSchemaFile, mExecType, mStorage);
- mDeterminedSchema = mSchema;
+ mSchema = determineSchema();
+ mDeterminedSchema = mSchema;
}
mIsSchemaComputed = true;
} catch (IOException ioe) {
@@ -161,6 +166,22 @@
return mSchema;
}
+ private Schema determineSchema() throws IOException {
+ if(LoadMetadata.class.isAssignableFrom(mLoadFunc.getClass())) {
+ // XXX: FIXME - mStorage should no longer be needed, we
+ // should use Configuration directly by passing a
+ // Configuration object while creating LOLoad rather than
+ // a DataStorage object
+ mLoadFunc.setLocation(mInputFileSpec.getFileName(),
+ new Job(ConfigurationUtil.toConfiguration(
+ mStorage.getConfiguration())));
+ LoadMetadata loadMetadata = (LoadMetadata)mLoadFunc;
+ ResourceSchema rSchema = loadMetadata.getSchema();
+ return Schema.getPigSchema(rSchema);
+ } else {
+ return null;
+ }
+ }
/* (non-Javadoc)
* @see org.apache.pig.impl.logicalLayer.LogicalOperator#setSchema(org.apache.pig.impl.logicalLayer.schema.Schema)
*/
@@ -253,7 +274,7 @@
}
} else {
try {
- inputSchema = mLoadFunc.determineSchema(mSchemaFile, mExecType, mStorage);
+ inputSchema = determineSchema();
} catch (IOException ioe) {
mProjectionMap = null;
return mProjectionMap;
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/schema/Schema.java?rev=830041&r1=830040&r2=830041&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/schema/Schema.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/schema/Schema.java Tue Oct 27 01:13:19 2009
@@ -28,6 +28,8 @@
import java.util.Collection;
import org.apache.pig.PigException;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.data.DataType;
//import org.apache.pig.impl.logicalLayer.parser.ParseException;
import org.apache.commons.logging.Log;
@@ -672,7 +674,7 @@
public FieldSchema getField(String alias) throws FrontendException {
FieldSchema fs = mAliases.get(alias);
if(null == fs) {
- String cocoPrefix = "::" + alias;
+ String cocoPrefix = new String("::" + alias);
Map<String, Integer> aliasMatches = new HashMap<String, Integer>();
//build the map of aliases that have cocoPrefix as the suffix
for(String key: mAliases.keySet()) {
@@ -798,7 +800,7 @@
if (aliases != null) {
List<String> listAliases = new ArrayList<String>();
for(String alias: aliases) {
- listAliases.add(alias);
+ listAliases.add(new String(alias));
}
for(String alias: listAliases) {
log.debug("Removing alias " + alias + " from multimap");
@@ -1597,6 +1599,34 @@
this.twoLevelAccessRequired = twoLevelAccess;
}
+ public static Schema getPigSchema(ResourceSchema rSchema)
+ throws FrontendException {
+ List<FieldSchema> fsList = new ArrayList<FieldSchema>();
+ for(ResourceFieldSchema rfs : rSchema.fields) {
+ FieldSchema fs = new FieldSchema(rfs.name, rfs.schema == null ? null:
+ getPigSchema(rfs.schema), rfs.type);
+
+ // check if we have a need to set twoLevelAcccessRequired flag
+ if(rfs.type == DataType.BAG) {
+ if(fs.schema.size() == 1) {
+ FieldSchema innerFs = fs.schema.getField(0);
+ if(innerFs.type == DataType.TUPLE && innerFs.schema != null) {
+ fs.schema.setTwoLevelAccessRequired(true);
+ }
+ }
+ }
+ fsList.add(fs);
+ }
+ return new Schema(fsList);
+ }
+
+ private static Schema getPigSchema(ResourceFieldSchema rfSchema)
+ throws FrontendException {
+ return new Schema(new FieldSchema(rfSchema.name,
+ rfSchema.schema == null ? null : getPigSchema(rfSchema.schema),
+ rfSchema.type));
+ }
+
}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/streaming/InputHandler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/streaming/InputHandler.java?rev=830041&r1=830040&r2=830041&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/streaming/InputHandler.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/streaming/InputHandler.java Tue Oct 27 01:13:19 2009
@@ -31,6 +31,7 @@
* manner via its <code>stdin</code> or in an {@link InputType#ASYNCHRONOUS}
* manner via an external file which is subsequently read by the executable.
*/
+//XXX: FIXME make this work with new load store redesign
public abstract class InputHandler {
/**
*
@@ -77,7 +78,7 @@
*/
public synchronized void close(Process process) throws IOException {
if(!alreadyClosed) {
- serializer.finish();
+// serializer.finish();
alreadyClosed = true;
}
}
@@ -91,6 +92,6 @@
* @throws IOException
*/
public void bindTo(OutputStream os) throws IOException {
- serializer.bindTo(os);
+// serializer.bindTo(os);
}
}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/streaming/OutputHandler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/streaming/OutputHandler.java?rev=830041&r1=830040&r2=830041&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/streaming/OutputHandler.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/streaming/OutputHandler.java Tue Oct 27 01:13:19 2009
@@ -62,8 +62,9 @@
*/
public void bindTo(String fileName, BufferedPositionedInputStream is,
long offset, long end) throws IOException {
- deserializer.bindTo(fileName, new BufferedPositionedInputStream(is),
- offset, end);
+ // XXX: FIXME - make this work with new load-store redesign
+// deserializer.bindTo(fileName, new BufferedPositionedInputStream(is),
+// offset, end);
}
/**
Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/utils/FILTERFROMFILE.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/utils/FILTERFROMFILE.java?rev=830041&r1=830040&r2=830041&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/utils/FILTERFROMFILE.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/utils/FILTERFROMFILE.java Tue Oct 27 01:13:19 2009
@@ -32,6 +32,7 @@
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
@@ -62,7 +63,7 @@
lookupTable = new HashMap<String, Boolean>();
- Properties props = ConfigurationUtil.toProperties(PigInputFormat.sJob);
+ Properties props = ConfigurationUtil.toProperties(PigMapReduce.sJobConf);
InputStream is = FileLocalizer.openDFSFile(FilterFileName, props);
BufferedReader reader = new BufferedReader(new InputStreamReader(is));