You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/04/23 22:41:11 UTC
[09/43] CRUNCH-196: crunch -> crunch-core rename to fix build issues
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java b/crunch/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java
deleted file mode 100644
index 4d2b88a..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io.impl;
-
-import java.io.IOException;
-
-import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.apache.crunch.Source;
-import org.apache.crunch.SourceTarget;
-import org.apache.crunch.Target;
-import org.apache.crunch.io.OutputHandler;
-import org.apache.crunch.types.PType;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Job;
-
-class SourceTargetImpl<T> implements SourceTarget<T> {
-
- protected final Source<T> source;
- protected final Target target;
-
- public SourceTargetImpl(Source<T> source, Target target) {
- this.source = source;
- this.target = target;
- }
-
- @Override
- public PType<T> getType() {
- return source.getType();
- }
-
- @Override
- public void configureSource(Job job, int inputId) throws IOException {
- source.configureSource(job, inputId);
- }
-
- @Override
- public long getSize(Configuration configuration) {
- return source.getSize(configuration);
- }
-
- @Override
- public boolean accept(OutputHandler handler, PType<?> ptype) {
- return target.accept(handler, ptype);
- }
-
- @Override
- public <S> SourceTarget<S> asSourceTarget(PType<S> ptype) {
- return target.asSourceTarget(ptype);
- }
-
- @Override
- public boolean equals(Object other) {
- if (other == null || !(other.getClass().equals(getClass()))) {
- return false;
- }
- SourceTargetImpl sti = (SourceTargetImpl) other;
- return source.equals(sti.source) && target.equals(sti.target);
- }
-
- @Override
- public int hashCode() {
- return new HashCodeBuilder().append(source).append(target).toHashCode();
- }
-
- @Override
- public String toString() {
- return source.toString();
- }
-
- @Override
- public void handleExisting(WriteMode strategy, Configuration conf) {
- target.handleExisting(strategy, conf);
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/impl/TableSourcePathTargetImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/TableSourcePathTargetImpl.java b/crunch/src/main/java/org/apache/crunch/io/impl/TableSourcePathTargetImpl.java
deleted file mode 100644
index a8ff639..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/impl/TableSourcePathTargetImpl.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io.impl;
-
-import org.apache.crunch.Pair;
-import org.apache.crunch.TableSource;
-import org.apache.crunch.io.FileNamingScheme;
-import org.apache.crunch.io.PathTarget;
-import org.apache.crunch.io.SequentialFileNamingScheme;
-import org.apache.crunch.types.PTableType;
-
-public class TableSourcePathTargetImpl<K, V> extends SourcePathTargetImpl<Pair<K, V>> implements TableSource<K, V> {
-
- public TableSourcePathTargetImpl(TableSource<K, V> source, PathTarget target) {
- this(source, target, new SequentialFileNamingScheme());
- }
-
- public TableSourcePathTargetImpl(TableSource<K, V> source, PathTarget target, FileNamingScheme fileNamingScheme) {
- super(source, target, fileNamingScheme);
- }
-
- @Override
- public PTableType<K, V> getTableType() {
- return ((TableSource<K, V>) source).getTableType();
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/impl/TableSourceTargetImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/TableSourceTargetImpl.java b/crunch/src/main/java/org/apache/crunch/io/impl/TableSourceTargetImpl.java
deleted file mode 100644
index 965b0f9..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/impl/TableSourceTargetImpl.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io.impl;
-
-import org.apache.crunch.Pair;
-import org.apache.crunch.TableSource;
-import org.apache.crunch.Target;
-import org.apache.crunch.types.PTableType;
-
-public class TableSourceTargetImpl<K, V> extends SourceTargetImpl<Pair<K, V>> implements TableSource<K, V> {
-
- public TableSourceTargetImpl(TableSource<K, V> source, Target target) {
- super(source, target);
- }
-
- @Override
- public PTableType<K, V> getTableType() {
- return ((TableSource<K, V>) source).getTableType();
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/package-info.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/package-info.java b/crunch/src/main/java/org/apache/crunch/io/package-info.java
deleted file mode 100644
index 022bc99..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Data input and output for Pipelines.
- */
-package org.apache.crunch.io;
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileHelper.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileHelper.java b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileHelper.java
deleted file mode 100644
index ba07506..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileHelper.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io.seq;
-
-import org.apache.crunch.MapFn;
-import org.apache.crunch.types.PType;
-import org.apache.crunch.types.writable.WritableType;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.util.ReflectionUtils;
-
-class SeqFileHelper {
- static <T> Writable newInstance(PType<T> ptype, Configuration conf) {
- return (Writable) ReflectionUtils.newInstance(((WritableType) ptype).getSerializationClass(), conf);
- }
-
- static <T> MapFn<Object, T> getInputMapFn(PType<T> ptype) {
- return ptype.getInputMapFn();
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java
deleted file mode 100644
index 3f45644..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io.seq;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.crunch.MapFn;
-import org.apache.crunch.fn.IdentityFn;
-import org.apache.crunch.io.FileReaderFactory;
-import org.apache.crunch.io.impl.AutoClosingIterator;
-import org.apache.crunch.types.Converter;
-import org.apache.crunch.types.PTableType;
-import org.apache.crunch.types.PType;
-import org.apache.crunch.types.writable.Writables;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import com.google.common.collect.Iterators;
-import com.google.common.collect.UnmodifiableIterator;
-
-public class SeqFileReaderFactory<T> implements FileReaderFactory<T> {
-
- private static final Log LOG = LogFactory.getLog(SeqFileReaderFactory.class);
-
- private final Converter converter;
- private final MapFn<Object, T> mapFn;
- private final Writable key;
- private final Writable value;
-
- public SeqFileReaderFactory(PType<T> ptype) {
- this.converter = ptype.getConverter();
- this.mapFn = ptype.getInputMapFn();
- if (ptype instanceof PTableType) {
- PTableType ptt = (PTableType) ptype;
- this.key = SeqFileHelper.newInstance(ptt.getKeyType(), null);
- this.value = SeqFileHelper.newInstance(ptt.getValueType(), null);
- } else {
- this.key = NullWritable.get();
- this.value = SeqFileHelper.newInstance(ptype, null);
- }
- }
-
- public SeqFileReaderFactory(Class clazz) {
- PType<T> ptype = Writables.writables(clazz);
- this.converter = ptype.getConverter();
- this.mapFn = ptype.getInputMapFn();
- this.key = NullWritable.get();
- this.value = (Writable) ReflectionUtils.newInstance(clazz, null);
- }
-
- @Override
- public Iterator<T> read(FileSystem fs, final Path path) {
- mapFn.initialize();
- try {
- final SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, fs.getConf());
- return new AutoClosingIterator<T>(reader, new UnmodifiableIterator<T>() {
- boolean nextChecked = false;
- boolean hasNext = false;
-
- @Override
- public boolean hasNext() {
- if (nextChecked == true) {
- return hasNext;
- }
- try {
- hasNext = reader.next(key, value);
- nextChecked = true;
- return hasNext;
- } catch (IOException e) {
- LOG.info("Error reading from path: " + path, e);
- return false;
- }
- }
-
- @Override
- public T next() {
- if (!nextChecked && !hasNext()) {
- return null;
- }
- nextChecked = false;
- return mapFn.map(converter.convertInput(key, value));
- }
- });
- } catch (IOException e) {
- LOG.info("Could not read seqfile at path: " + path, e);
- return Iterators.emptyIterator();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileSource.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileSource.java b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileSource.java
deleted file mode 100644
index 8fac4ae..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileSource.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io.seq;
-
-import java.io.IOException;
-
-import org.apache.crunch.io.CompositePathIterable;
-import org.apache.crunch.io.ReadableSource;
-import org.apache.crunch.io.impl.FileSourceImpl;
-import org.apache.crunch.types.PType;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-
-public class SeqFileSource<T> extends FileSourceImpl<T> implements ReadableSource<T> {
-
- public SeqFileSource(Path path, PType<T> ptype) {
- super(path, ptype, SequenceFileInputFormat.class);
- }
-
- @Override
- public Iterable<T> read(Configuration conf) throws IOException {
- FileSystem fs = path.getFileSystem(conf);
- return CompositePathIterable.create(fs, path, new SeqFileReaderFactory<T>(ptype));
- }
-
- @Override
- public String toString() {
- return "SeqFile(" + path.toString() + ")";
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileSourceTarget.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileSourceTarget.java b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileSourceTarget.java
deleted file mode 100644
index adc739f..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileSourceTarget.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io.seq;
-
-import org.apache.crunch.io.FileNamingScheme;
-import org.apache.crunch.io.SequentialFileNamingScheme;
-import org.apache.crunch.io.impl.ReadableSourcePathTargetImpl;
-import org.apache.crunch.types.PType;
-import org.apache.hadoop.fs.Path;
-
-public class SeqFileSourceTarget<T> extends ReadableSourcePathTargetImpl<T> {
-
- public SeqFileSourceTarget(String path, PType<T> ptype) {
- this(new Path(path), ptype);
- }
-
- public SeqFileSourceTarget(Path path, PType<T> ptype) {
- this(path, ptype, new SequentialFileNamingScheme());
- }
-
- public SeqFileSourceTarget(Path path, PType<T> ptype, FileNamingScheme fileNamingScheme) {
- super(new SeqFileSource<T>(path, ptype), new SeqFileTarget(path), fileNamingScheme);
- }
-
- @Override
- public String toString() {
- return target.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableSource.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableSource.java b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableSource.java
deleted file mode 100644
index 7a63272..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableSource.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io.seq;
-
-import java.io.IOException;
-
-import org.apache.crunch.Pair;
-import org.apache.crunch.io.CompositePathIterable;
-import org.apache.crunch.io.ReadableSource;
-import org.apache.crunch.io.impl.FileTableSourceImpl;
-import org.apache.crunch.types.PTableType;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-
-/**
- * A {@code TableSource} that uses {@code SequenceFileInputFormat} to read the input
- * file.
- */
-public class SeqFileTableSource<K, V> extends FileTableSourceImpl<K, V> implements ReadableSource<Pair<K, V>> {
-
- public SeqFileTableSource(String path, PTableType<K, V> ptype) {
- this(new Path(path), ptype);
- }
-
- public SeqFileTableSource(Path path, PTableType<K, V> ptype) {
- super(path, ptype, SequenceFileInputFormat.class);
- }
-
- @Override
- public Iterable<Pair<K, V>> read(Configuration conf) throws IOException {
- FileSystem fs = path.getFileSystem(conf);
- return CompositePathIterable.create(fs, path,
- new SeqFileReaderFactory<Pair<K, V>>(getTableType()));
- }
-
- @Override
- public String toString() {
- return "SeqFile(" + path.toString() + ")";
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableSourceTarget.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableSourceTarget.java b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableSourceTarget.java
deleted file mode 100644
index ebdf319..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableSourceTarget.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io.seq;
-
-import org.apache.crunch.Pair;
-import org.apache.crunch.TableSourceTarget;
-import org.apache.crunch.io.FileNamingScheme;
-import org.apache.crunch.io.SequentialFileNamingScheme;
-import org.apache.crunch.io.impl.ReadableSourcePathTargetImpl;
-import org.apache.crunch.types.PTableType;
-import org.apache.hadoop.fs.Path;
-
-public class SeqFileTableSourceTarget<K, V> extends ReadableSourcePathTargetImpl<Pair<K, V>> implements
- TableSourceTarget<K, V> {
- private final PTableType<K, V> tableType;
-
- public SeqFileTableSourceTarget(String path, PTableType<K, V> tableType) {
- this(new Path(path), tableType);
- }
-
- public SeqFileTableSourceTarget(Path path, PTableType<K, V> tableType) {
- this(path, tableType, new SequentialFileNamingScheme());
- }
-
- public SeqFileTableSourceTarget(Path path, PTableType<K, V> tableType, FileNamingScheme fileNamingScheme) {
- super(new SeqFileTableSource<K, V>(path, tableType), new SeqFileTarget(path), fileNamingScheme);
- this.tableType = tableType;
- }
-
- @Override
- public PTableType<K, V> getTableType() {
- return tableType;
- }
-
- @Override
- public String toString() {
- return target.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTarget.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTarget.java b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTarget.java
deleted file mode 100644
index 60e4739..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTarget.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io.seq;
-
-import org.apache.crunch.SourceTarget;
-import org.apache.crunch.io.FileNamingScheme;
-import org.apache.crunch.io.SequentialFileNamingScheme;
-import org.apache.crunch.io.impl.FileTargetImpl;
-import org.apache.crunch.types.PTableType;
-import org.apache.crunch.types.PType;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-
-public class SeqFileTarget extends FileTargetImpl {
- public SeqFileTarget(String path) {
- this(new Path(path));
- }
-
- public SeqFileTarget(Path path) {
- this(path, new SequentialFileNamingScheme());
- }
-
- public SeqFileTarget(Path path, FileNamingScheme fileNamingScheme) {
- super(path, SequenceFileOutputFormat.class, fileNamingScheme);
- }
-
- @Override
- public String toString() {
- return "SeqFile(" + path.toString() + ")";
- }
-
- @Override
- public <T> SourceTarget<T> asSourceTarget(PType<T> ptype) {
- if (ptype instanceof PTableType) {
- return new SeqFileTableSourceTarget(path, (PTableType) ptype);
- } else {
- return new SeqFileSourceTarget(path, ptype);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/text/BZip2TextInputFormat.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/text/BZip2TextInputFormat.java b/crunch/src/main/java/org/apache/crunch/io/text/BZip2TextInputFormat.java
deleted file mode 100644
index 67a8870..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/text/BZip2TextInputFormat.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io.text;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-
-class BZip2TextInputFormat extends FileInputFormat<LongWritable, Text> {
- /**
- * Treats keys as offset in file and value as line. Since the input file is
- * compressed, the offset for a particular line is not well-defined. This
- * implementation returns the starting position of a compressed block as the
- * key for every line in that block.
- */
-
- private static class BZip2LineRecordReader extends RecordReader<LongWritable, Text> {
-
- private long start;
-
- private long end;
-
- private long pos;
-
- private CBZip2InputStream in;
-
- private ByteArrayOutputStream buffer = new ByteArrayOutputStream(256);
-
- // flag to indicate if previous character read was Carriage Return ('\r')
- // and the next character was not Line Feed ('\n')
- private boolean CRFollowedByNonLF = false;
-
- // in the case where a Carriage Return ('\r') was not followed by a
- // Line Feed ('\n'), this variable will hold that non Line Feed character
- // that was read from the underlying stream.
- private byte nonLFChar;
-
- /**
- * Provide a bridge to get the bytes from the ByteArrayOutputStream without
- * creating a new byte array.
- */
- private static class TextStuffer extends OutputStream {
- public Text target;
-
- @Override
- public void write(int b) {
- throw new UnsupportedOperationException("write(byte) not supported");
- }
-
- @Override
- public void write(byte[] data, int offset, int len) throws IOException {
- target.clear();
- target.set(data, offset, len);
- }
- }
-
- private TextStuffer bridge = new TextStuffer();
-
- private LongWritable key = new LongWritable();
- private Text value = new Text();
-
- public BZip2LineRecordReader(Configuration job, FileSplit split) throws IOException {
- start = split.getStart();
- end = start + split.getLength();
- final Path file = split.getPath();
-
- // open the file and seek to the start of the split
- FileSystem fs = file.getFileSystem(job);
- FSDataInputStream fileIn = fs.open(split.getPath());
- fileIn.seek(start);
-
- in = new CBZip2InputStream(fileIn, 9, end);
- if (start != 0) {
- // skip first line and re-establish "start".
- // LineRecordReader.readLine(this.in, null);
- readLine(this.in, null);
- start = in.getPos();
- }
- pos = in.getPos();
- }
-
- /*
- * LineRecordReader.readLine() is depricated in HAdoop 0.17. So it is added
- * here locally.
- */
- private long readLine(InputStream in, OutputStream out) throws IOException {
- long bytes = 0;
- while (true) {
- int b = -1;
- if (CRFollowedByNonLF) {
- // In the previous call, a Carriage Return ('\r') was followed
- // by a non Line Feed ('\n') character - in that call we would
- // have not returned the non Line Feed character but would have
- // read it from the stream - lets use that already read character
- // now
- b = nonLFChar;
- CRFollowedByNonLF = false;
- } else {
- b = in.read();
- }
- if (b == -1) {
- break;
- }
- bytes += 1;
-
- byte c = (byte) b;
- if (c == '\n') {
- break;
- }
-
- if (c == '\r') {
- byte nextC = (byte) in.read();
- if (nextC != '\n') {
- CRFollowedByNonLF = true;
- nonLFChar = nextC;
- } else {
- bytes += 1;
- }
- break;
- }
-
- if (out != null) {
- out.write(c);
- }
- }
- return bytes;
- }
-
- /** Read a line. */
- public boolean next(LongWritable key, Text value) throws IOException {
- if (pos > end)
- return false;
-
- key.set(pos); // key is position
- buffer.reset();
- // long bytesRead = LineRecordReader.readLine(in, buffer);
- long bytesRead = readLine(in, buffer);
- if (bytesRead == 0) {
- return false;
- }
- pos = in.getPos();
- // if we have read ahead because we encountered a carriage return
- // char followed by a non line feed char, decrement the pos
- if (CRFollowedByNonLF) {
- pos--;
- }
-
- bridge.target = value;
- buffer.writeTo(bridge);
- return true;
- }
-
- /**
- * Get the progress within the split
- */
- @Override
- public float getProgress() {
- if (start == end) {
- return 0.0f;
- } else {
- return Math.min(1.0f, (pos - start) / (float) (end - start));
- }
- }
-
- @Override
- public void close() throws IOException {
- in.close();
- }
-
- @Override
- public LongWritable getCurrentKey() throws IOException, InterruptedException {
- return key;
- }
-
- @Override
- public Text getCurrentValue() throws IOException, InterruptedException {
- return value;
- }
-
- @Override
- public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
- // no op
- }
-
- @Override
- public boolean nextKeyValue() throws IOException, InterruptedException {
- return next(key, value);
- }
-
- }
-
- @Override
- protected boolean isSplitable(JobContext context, Path file) {
- return true;
- }
-
- @Override
- public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
- try {
- return new BZip2LineRecordReader(context.getConfiguration(), (FileSplit) split);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/text/CBZip2InputStream.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/text/CBZip2InputStream.java b/crunch/src/main/java/org/apache/crunch/io/text/CBZip2InputStream.java
deleted file mode 100644
index 92bb787..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/text/CBZip2InputStream.java
+++ /dev/null
@@ -1,980 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io.text;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.io.compress.bzip2.BZip2Constants;
-import org.apache.hadoop.mapreduce.InputSplit;
-
-/**
- * An input stream that decompresses from the BZip2 format (without the file
- * header chars) to be read as any other stream.
- *
- * @author <a href="mailto:keiron@aftexsw.com">Keiron Liddle</a>
- */
-class CBZip2InputStream extends InputStream implements BZip2Constants {
- private static void cadvise(String reason) throws IOException {
- throw new IOException(reason);
- }
-
- private static void compressedStreamEOF() throws IOException {
- cadvise("compressedStream EOF");
- }
-
- private void makeMaps() {
- int i;
- nInUse = 0;
- for (i = 0; i < 256; i++) {
- if (inUse[i]) {
- seqToUnseq[nInUse] = (char) i;
- unseqToSeq[i] = (char) nInUse;
- nInUse++;
- }
- }
- }
-
- /*
- * index of the last char in the block, so the block size == last + 1.
- */
- private int last;
-
- /*
- * index in zptr[] of original string after sorting.
- */
- private int origPtr;
-
- /*
- * always: in the range 0 .. 9. The current block size is 100000 * this
- * number.
- */
- private int blockSize100k;
-
- private boolean blockRandomised;
-
- // a buffer to keep the read byte
- private int bsBuff;
-
- // since bzip is bit-aligned at block boundaries there can be a case wherein
- // only few bits out of a read byte are consumed and the remaining bits
- // need to be consumed while processing the next block.
- // indicate how many bits in bsBuff have not been processed yet
- private int bsLive;
- private CRC mCrc = new CRC();
-
- private boolean[] inUse = new boolean[256];
- private int nInUse;
-
- private char[] seqToUnseq = new char[256];
- private char[] unseqToSeq = new char[256];
-
- private char[] selector = new char[MAX_SELECTORS];
- private char[] selectorMtf = new char[MAX_SELECTORS];
-
- private int[] tt;
- private char[] ll8;
-
- /*
- * freq table collected to save a pass over the data during decompression.
- */
- private int[] unzftab = new int[256];
-
- private int[][] limit = new int[N_GROUPS][MAX_ALPHA_SIZE];
- private int[][] base = new int[N_GROUPS][MAX_ALPHA_SIZE];
- private int[][] perm = new int[N_GROUPS][MAX_ALPHA_SIZE];
- private int[] minLens = new int[N_GROUPS];
-
- private FSDataInputStream innerBsStream;
- long readLimit = Long.MAX_VALUE;
-
- public long getReadLimit() {
- return readLimit;
- }
-
- public void setReadLimit(long readLimit) {
- this.readLimit = readLimit;
- }
-
- long readCount;
-
- public long getReadCount() {
- return readCount;
- }
-
- private boolean streamEnd = false;
-
- private int currentChar = -1;
-
- private static final int START_BLOCK_STATE = 1;
- private static final int RAND_PART_A_STATE = 2;
- private static final int RAND_PART_B_STATE = 3;
- private static final int RAND_PART_C_STATE = 4;
- private static final int NO_RAND_PART_A_STATE = 5;
- private static final int NO_RAND_PART_B_STATE = 6;
- private static final int NO_RAND_PART_C_STATE = 7;
-
- private int currentState = START_BLOCK_STATE;
-
- private int storedBlockCRC, storedCombinedCRC;
- private int computedBlockCRC, computedCombinedCRC;
- private boolean checkComputedCombinedCRC = true;
-
- int i2, count, chPrev, ch2;
- int i, tPos;
- int rNToGo = 0;
- int rTPos = 0;
- int j2;
- char z;
-
- // see comment in getPos()
- private long retPos = -1;
- // the position offset which corresponds to the end of the InputSplit that
- // will be processed by this instance
- private long endOffsetOfSplit;
-
- private boolean signalToStopReading;
-
- public CBZip2InputStream(FSDataInputStream zStream, int blockSize, long end) throws IOException {
- endOffsetOfSplit = end;
- // initialize retPos to the beginning of the current InputSplit
- // see comments in getPos() to understand how this is used.
- retPos = zStream.getPos();
- ll8 = null;
- tt = null;
- checkComputedCombinedCRC = blockSize == -1;
- bsSetStream(zStream);
- initialize(blockSize);
- initBlock(blockSize != -1);
- setupBlock();
- }
-
- @Override
- public int read() throws IOException {
- if (streamEnd) {
- return -1;
- } else {
-
- // if we just started reading a bzip block which starts at a position
- // >= end of current split, then we should set up retpos such that
- // after a record is read, future getPos() calls will get a value
- // > end of current split - this way we will read only one record out
- // of this bzip block - the rest of the records from this bzip block
- // should be read by the next map task while processing the next split
- if (signalToStopReading) {
- retPos = endOffsetOfSplit + 1;
- }
-
- int retChar = currentChar;
- switch (currentState) {
- case START_BLOCK_STATE:
- break;
- case RAND_PART_A_STATE:
- break;
- case RAND_PART_B_STATE:
- setupRandPartB();
- break;
- case RAND_PART_C_STATE:
- setupRandPartC();
- break;
- case NO_RAND_PART_A_STATE:
- break;
- case NO_RAND_PART_B_STATE:
- setupNoRandPartB();
- break;
- case NO_RAND_PART_C_STATE:
- setupNoRandPartC();
- break;
- default:
- break;
- }
- return retChar;
- }
- }
-
- /**
- * getPos is used by the caller to know when the processing of the current
- * {@link InputSplit} is complete. In this method, as we read each bzip block,
- * we keep returning the beginning of the {@link InputSplit} as the return
- * value until we hit a block which starts at a position >= end of current
- * split. At that point we should set up retpos such that after a record is
- * read, future getPos() calls will get a value > end of current split - this
- * way we will read only one record out of that bzip block - the rest of the
- * records from that bzip block should be read by the next map task while
- * processing the next split
- *
- * @return
- * @throws IOException
- */
- public long getPos() throws IOException {
- return retPos;
- }
-
- private void initialize(int blockSize) throws IOException {
- if (blockSize == -1) {
- char magic1, magic2;
- char magic3, magic4;
- magic1 = bsGetUChar();
- magic2 = bsGetUChar();
- magic3 = bsGetUChar();
- magic4 = bsGetUChar();
- if (magic1 != 'B' || magic2 != 'Z' || magic3 != 'h' || magic4 < '1' || magic4 > '9') {
- bsFinishedWithStream();
- streamEnd = true;
- return;
- }
- blockSize = magic4 - '0';
- }
-
- setDecompressStructureSizes(blockSize);
- computedCombinedCRC = 0;
- }
-
- private final static long mask = 0xffffffffffffL;
- private final static long eob = 0x314159265359L & mask;
- private final static long eos = 0x177245385090L & mask;
-
- private void initBlock(boolean searchForMagic) throws IOException {
- if (readCount >= readLimit) {
- bsFinishedWithStream();
- streamEnd = true;
- return;
- }
-
- // position before beginning of bzip block header
- long pos = innerBsStream.getPos();
- if (!searchForMagic) {
- char magic1, magic2, magic3, magic4;
- char magic5, magic6;
- magic1 = bsGetUChar();
- magic2 = bsGetUChar();
- magic3 = bsGetUChar();
- magic4 = bsGetUChar();
- magic5 = bsGetUChar();
- magic6 = bsGetUChar();
- if (magic1 == 0x17 && magic2 == 0x72 && magic3 == 0x45 && magic4 == 0x38 && magic5 == 0x50 && magic6 == 0x90) {
- complete();
- return;
- }
-
- if (magic1 != 0x31 || magic2 != 0x41 || magic3 != 0x59 || magic4 != 0x26 || magic5 != 0x53 || magic6 != 0x59) {
- badBlockHeader();
- streamEnd = true;
- return;
- }
- } else {
- long magic = 0;
- for (int i = 0; i < 6; i++) {
- magic <<= 8;
- magic |= bsGetUChar();
- }
- while (magic != eos && magic != eob) {
- magic <<= 1;
- magic &= mask;
- magic |= bsR(1);
- // if we just found the block header, the beginning of the bzip
- // header would be 6 bytes before the current stream position
- // when we eventually break from this while(), if it is because
- // we found a block header then pos will have the correct start
- // of header position
- pos = innerBsStream.getPos() - 6;
- }
- if (magic == eos) {
- complete();
- return;
- }
-
- }
- // if the previous block finished a few bits into the previous byte,
- // then we will first be reading the remaining bits from the previous
- // byte - so logically pos needs to be one behind
- if (bsLive > 0) {
- pos--;
- }
-
- if (pos >= endOffsetOfSplit) {
- // we have reached a block which begins exactly at the next InputSplit
- // or >1 byte into the next InputSplit - lets record this fact
- signalToStopReading = true;
- }
- storedBlockCRC = bsGetInt32();
-
- if (bsR(1) == 1) {
- blockRandomised = true;
- } else {
- blockRandomised = false;
- }
-
- // currBlockNo++;
- getAndMoveToFrontDecode();
-
- mCrc.initialiseCRC();
- currentState = START_BLOCK_STATE;
- }
-
- private void endBlock() throws IOException {
- computedBlockCRC = mCrc.getFinalCRC();
- /* A bad CRC is considered a fatal error. */
- if (storedBlockCRC != computedBlockCRC) {
- crcError();
- }
-
- computedCombinedCRC = (computedCombinedCRC << 1) | (computedCombinedCRC >>> 31);
- computedCombinedCRC ^= computedBlockCRC;
- }
-
- private void complete() throws IOException {
- storedCombinedCRC = bsGetInt32();
- if (checkComputedCombinedCRC && storedCombinedCRC != computedCombinedCRC) {
- crcError();
- }
- if (innerBsStream.getPos() < endOffsetOfSplit) {
- throw new IOException("Encountered additional bytes in the filesplit past the crc block. "
- + "Loading of concatenated bz2 files is not supported");
- }
- bsFinishedWithStream();
- streamEnd = true;
- }
-
- private static void blockOverrun() throws IOException {
- cadvise("block overrun");
- }
-
- private static void badBlockHeader() throws IOException {
- cadvise("bad block header");
- }
-
- private static void crcError() throws IOException {
- cadvise("CRC error");
- }
-
- private void bsFinishedWithStream() {
- if (this.innerBsStream != null) {
- if (this.innerBsStream != System.in) {
- this.innerBsStream = null;
- }
- }
- }
-
- private void bsSetStream(FSDataInputStream f) {
- innerBsStream = f;
- bsLive = 0;
- bsBuff = 0;
- }
-
- final private int readBs() throws IOException {
- readCount++;
- return innerBsStream.read();
- }
-
- private int bsR(int n) throws IOException {
- int v;
- while (bsLive < n) {
- int zzi;
- zzi = readBs();
- if (zzi == -1) {
- compressedStreamEOF();
- }
- bsBuff = (bsBuff << 8) | (zzi & 0xff);
- bsLive += 8;
- }
-
- v = (bsBuff >> (bsLive - n)) & ((1 << n) - 1);
- bsLive -= n;
- return v;
- }
-
- private char bsGetUChar() throws IOException {
- return (char) bsR(8);
- }
-
- private int bsGetint() throws IOException {
- int u = 0;
- u = (u << 8) | bsR(8);
- u = (u << 8) | bsR(8);
- u = (u << 8) | bsR(8);
- u = (u << 8) | bsR(8);
- return u;
- }
-
- private int bsGetIntVS(int numBits) throws IOException {
- return bsR(numBits);
- }
-
- private int bsGetInt32() throws IOException {
- return bsGetint();
- }
-
- private void hbCreateDecodeTables(int[] limit, int[] base, int[] perm, char[] length, int minLen, int maxLen,
- int alphaSize) {
- int pp, i, j, vec;
-
- pp = 0;
- for (i = minLen; i <= maxLen; i++) {
- for (j = 0; j < alphaSize; j++) {
- if (length[j] == i) {
- perm[pp] = j;
- pp++;
- }
- }
- }
-
- for (i = 0; i < MAX_CODE_LEN; i++) {
- base[i] = 0;
- }
- for (i = 0; i < alphaSize; i++) {
- base[length[i] + 1]++;
- }
-
- for (i = 1; i < MAX_CODE_LEN; i++) {
- base[i] += base[i - 1];
- }
-
- for (i = 0; i < MAX_CODE_LEN; i++) {
- limit[i] = 0;
- }
- vec = 0;
-
- for (i = minLen; i <= maxLen; i++) {
- vec += (base[i + 1] - base[i]);
- limit[i] = vec - 1;
- vec <<= 1;
- }
- for (i = minLen + 1; i <= maxLen; i++) {
- base[i] = ((limit[i - 1] + 1) << 1) - base[i];
- }
- }
-
- private void recvDecodingTables() throws IOException {
- char len[][] = new char[N_GROUPS][MAX_ALPHA_SIZE];
- int i, j, t, nGroups, nSelectors, alphaSize;
- int minLen, maxLen;
- boolean[] inUse16 = new boolean[16];
-
- /* Receive the mapping table */
- for (i = 0; i < 16; i++) {
- if (bsR(1) == 1) {
- inUse16[i] = true;
- } else {
- inUse16[i] = false;
- }
- }
-
- for (i = 0; i < 256; i++) {
- inUse[i] = false;
- }
-
- for (i = 0; i < 16; i++) {
- if (inUse16[i]) {
- for (j = 0; j < 16; j++) {
- if (bsR(1) == 1) {
- inUse[i * 16 + j] = true;
- }
- }
- }
- }
-
- makeMaps();
- alphaSize = nInUse + 2;
-
- /* Now the selectors */
- nGroups = bsR(3);
- nSelectors = bsR(15);
- for (i = 0; i < nSelectors; i++) {
- j = 0;
- while (bsR(1) == 1) {
- j++;
- }
- selectorMtf[i] = (char) j;
- }
-
- /* Undo the MTF values for the selectors. */
- {
- char[] pos = new char[N_GROUPS];
- char tmp, v;
- for (v = 0; v < nGroups; v++) {
- pos[v] = v;
- }
-
- for (i = 0; i < nSelectors; i++) {
- v = selectorMtf[i];
- tmp = pos[v];
- while (v > 0) {
- pos[v] = pos[v - 1];
- v--;
- }
- pos[0] = tmp;
- selector[i] = tmp;
- }
- }
-
- /* Now the coding tables */
- for (t = 0; t < nGroups; t++) {
- int curr = bsR(5);
- for (i = 0; i < alphaSize; i++) {
- while (bsR(1) == 1) {
- if (bsR(1) == 0) {
- curr++;
- } else {
- curr--;
- }
- }
- len[t][i] = (char) curr;
- }
- }
-
- /* Create the Huffman decoding tables */
- for (t = 0; t < nGroups; t++) {
- minLen = 32;
- maxLen = 0;
- for (i = 0; i < alphaSize; i++) {
- if (len[t][i] > maxLen) {
- maxLen = len[t][i];
- }
- if (len[t][i] < minLen) {
- minLen = len[t][i];
- }
- }
- hbCreateDecodeTables(limit[t], base[t], perm[t], len[t], minLen, maxLen, alphaSize);
- minLens[t] = minLen;
- }
- }
-
- private void getAndMoveToFrontDecode() throws IOException {
- char[] yy = new char[256];
- int i, j, nextSym, limitLast;
- int EOB, groupNo, groupPos;
-
- limitLast = baseBlockSize * blockSize100k;
- origPtr = bsGetIntVS(24);
-
- recvDecodingTables();
- EOB = nInUse + 1;
- groupNo = -1;
- groupPos = 0;
-
- /*
- * Setting up the unzftab entries here is not strictly necessary, but it
- * does save having to do it later in a separate pass, and so saves a
- * block's worth of cache misses.
- */
- for (i = 0; i <= 255; i++) {
- unzftab[i] = 0;
- }
-
- for (i = 0; i <= 255; i++) {
- yy[i] = (char) i;
- }
-
- last = -1;
-
- {
- int zt, zn, zvec, zj;
- if (groupPos == 0) {
- groupNo++;
- groupPos = G_SIZE;
- }
- groupPos--;
- zt = selector[groupNo];
- zn = minLens[zt];
- zvec = bsR(zn);
- while (zvec > limit[zt][zn]) {
- zn++;
- {
- {
- while (bsLive < 1) {
- int zzi = 0;
- try {
- zzi = readBs();
- } catch (IOException e) {
- compressedStreamEOF();
- }
- if (zzi == -1) {
- compressedStreamEOF();
- }
- bsBuff = (bsBuff << 8) | (zzi & 0xff);
- bsLive += 8;
- }
- }
- zj = (bsBuff >> (bsLive - 1)) & 1;
- bsLive--;
- }
- zvec = (zvec << 1) | zj;
- }
- nextSym = perm[zt][zvec - base[zt][zn]];
- }
-
- while (true) {
-
- if (nextSym == EOB) {
- break;
- }
-
- if (nextSym == RUNA || nextSym == RUNB) {
- char ch;
- int s = -1;
- int N = 1;
- do {
- if (nextSym == RUNA) {
- s = s + (0 + 1) * N;
- } else if (nextSym == RUNB) {
- s = s + (1 + 1) * N;
- }
- N = N * 2;
- {
- int zt, zn, zvec, zj;
- if (groupPos == 0) {
- groupNo++;
- groupPos = G_SIZE;
- }
- groupPos--;
- zt = selector[groupNo];
- zn = minLens[zt];
- zvec = bsR(zn);
- while (zvec > limit[zt][zn]) {
- zn++;
- {
- {
- while (bsLive < 1) {
- int zzi = 0;
- try {
- zzi = readBs();
- } catch (IOException e) {
- compressedStreamEOF();
- }
- if (zzi == -1) {
- compressedStreamEOF();
- }
- bsBuff = (bsBuff << 8) | (zzi & 0xff);
- bsLive += 8;
- }
- }
- zj = (bsBuff >> (bsLive - 1)) & 1;
- bsLive--;
- }
- zvec = (zvec << 1) | zj;
- }
- nextSym = perm[zt][zvec - base[zt][zn]];
- }
- } while (nextSym == RUNA || nextSym == RUNB);
-
- s++;
- ch = seqToUnseq[yy[0]];
- unzftab[ch] += s;
-
- while (s > 0) {
- last++;
- ll8[last] = ch;
- s--;
- }
-
- if (last >= limitLast) {
- blockOverrun();
- }
- continue;
- } else {
- char tmp;
- last++;
- if (last >= limitLast) {
- blockOverrun();
- }
-
- tmp = yy[nextSym - 1];
- unzftab[seqToUnseq[tmp]]++;
- ll8[last] = seqToUnseq[tmp];
-
- /*
- * This loop is hammered during decompression, hence the unrolling.
- *
- * for (j = nextSym-1; j > 0; j--) yy[j] = yy[j-1];
- */
-
- j = nextSym - 1;
- for (; j > 3; j -= 4) {
- yy[j] = yy[j - 1];
- yy[j - 1] = yy[j - 2];
- yy[j - 2] = yy[j - 3];
- yy[j - 3] = yy[j - 4];
- }
- for (; j > 0; j--) {
- yy[j] = yy[j - 1];
- }
-
- yy[0] = tmp;
- {
- int zt, zn, zvec, zj;
- if (groupPos == 0) {
- groupNo++;
- groupPos = G_SIZE;
- }
- groupPos--;
- zt = selector[groupNo];
- zn = minLens[zt];
- zvec = bsR(zn);
- while (zvec > limit[zt][zn]) {
- zn++;
- {
- {
- while (bsLive < 1) {
- int zzi;
- char thech = 0;
- try {
- thech = (char) readBs();
- } catch (IOException e) {
- compressedStreamEOF();
- }
- zzi = thech;
- bsBuff = (bsBuff << 8) | (zzi & 0xff);
- bsLive += 8;
- }
- }
- zj = (bsBuff >> (bsLive - 1)) & 1;
- bsLive--;
- }
- zvec = (zvec << 1) | zj;
- }
- nextSym = perm[zt][zvec - base[zt][zn]];
- }
- continue;
- }
- }
- }
-
- private void setupBlock() throws IOException {
- int[] cftab = new int[257];
- char ch;
-
- cftab[0] = 0;
- for (i = 1; i <= 256; i++) {
- cftab[i] = unzftab[i - 1];
- }
- for (i = 1; i <= 256; i++) {
- cftab[i] += cftab[i - 1];
- }
-
- for (i = 0; i <= last; i++) {
- ch = ll8[i];
- tt[cftab[ch]] = i;
- cftab[ch]++;
- }
- cftab = null;
-
- tPos = tt[origPtr];
-
- count = 0;
- i2 = 0;
- ch2 = 256; /* not a char and not EOF */
-
- if (blockRandomised) {
- rNToGo = 0;
- rTPos = 0;
- setupRandPartA();
- } else {
- setupNoRandPartA();
- }
- }
-
- private void setupRandPartA() throws IOException {
- if (i2 <= last) {
- chPrev = ch2;
- ch2 = ll8[tPos];
- tPos = tt[tPos];
- if (rNToGo == 0) {
- rNToGo = rNums[rTPos];
- rTPos++;
- if (rTPos == 512) {
- rTPos = 0;
- }
- }
- rNToGo--;
- ch2 ^= ((rNToGo == 1) ? 1 : 0);
- i2++;
-
- currentChar = ch2;
- currentState = RAND_PART_B_STATE;
- mCrc.updateCRC(ch2);
- } else {
- endBlock();
- initBlock(false);
- setupBlock();
- }
- }
-
- private void setupNoRandPartA() throws IOException {
- if (i2 <= last) {
- chPrev = ch2;
- ch2 = ll8[tPos];
- tPos = tt[tPos];
- i2++;
-
- currentChar = ch2;
- currentState = NO_RAND_PART_B_STATE;
- mCrc.updateCRC(ch2);
- } else {
- endBlock();
- initBlock(false);
- setupBlock();
- }
- }
-
- private void setupRandPartB() throws IOException {
- if (ch2 != chPrev) {
- currentState = RAND_PART_A_STATE;
- count = 1;
- setupRandPartA();
- } else {
- count++;
- if (count >= 4) {
- z = ll8[tPos];
- tPos = tt[tPos];
- if (rNToGo == 0) {
- rNToGo = rNums[rTPos];
- rTPos++;
- if (rTPos == 512) {
- rTPos = 0;
- }
- }
- rNToGo--;
- z ^= ((rNToGo == 1) ? 1 : 0);
- j2 = 0;
- currentState = RAND_PART_C_STATE;
- setupRandPartC();
- } else {
- currentState = RAND_PART_A_STATE;
- setupRandPartA();
- }
- }
- }
-
- private void setupRandPartC() throws IOException {
- if (j2 < (int) z) {
- currentChar = ch2;
- mCrc.updateCRC(ch2);
- j2++;
- } else {
- currentState = RAND_PART_A_STATE;
- i2++;
- count = 0;
- setupRandPartA();
- }
- }
-
- private void setupNoRandPartB() throws IOException {
- if (ch2 != chPrev) {
- currentState = NO_RAND_PART_A_STATE;
- count = 1;
- setupNoRandPartA();
- } else {
- count++;
- if (count >= 4) {
- z = ll8[tPos];
- tPos = tt[tPos];
- currentState = NO_RAND_PART_C_STATE;
- j2 = 0;
- setupNoRandPartC();
- } else {
- currentState = NO_RAND_PART_A_STATE;
- setupNoRandPartA();
- }
- }
- }
-
- private void setupNoRandPartC() throws IOException {
- if (j2 < (int) z) {
- currentChar = ch2;
- mCrc.updateCRC(ch2);
- j2++;
- } else {
- currentState = NO_RAND_PART_A_STATE;
- i2++;
- count = 0;
- setupNoRandPartA();
- }
- }
-
- private void setDecompressStructureSizes(int newSize100k) {
- if (!(0 <= newSize100k && newSize100k <= 9 && 0 <= blockSize100k && blockSize100k <= 9)) {
- // throw new IOException("Invalid block size");
- }
-
- blockSize100k = newSize100k;
-
- if (newSize100k == 0) {
- return;
- }
-
- int n = baseBlockSize * newSize100k;
- ll8 = new char[n];
- tt = new int[n];
- }
-
- private static class CRC {
- public static int crc32Table[] = { 0x00000000, 0x04c11db7, 0x09823b6e, 0x0d4326d9, 0x130476dc, 0x17c56b6b,
- 0x1a864db2, 0x1e475005, 0x2608edb8, 0x22c9f00f, 0x2f8ad6d6, 0x2b4bcb61, 0x350c9b64, 0x31cd86d3, 0x3c8ea00a,
- 0x384fbdbd, 0x4c11db70, 0x48d0c6c7, 0x4593e01e, 0x4152fda9, 0x5f15adac, 0x5bd4b01b, 0x569796c2, 0x52568b75,
- 0x6a1936c8, 0x6ed82b7f, 0x639b0da6, 0x675a1011, 0x791d4014, 0x7ddc5da3, 0x709f7b7a, 0x745e66cd, 0x9823b6e0,
- 0x9ce2ab57, 0x91a18d8e, 0x95609039, 0x8b27c03c, 0x8fe6dd8b, 0x82a5fb52, 0x8664e6e5, 0xbe2b5b58, 0xbaea46ef,
- 0xb7a96036, 0xb3687d81, 0xad2f2d84, 0xa9ee3033, 0xa4ad16ea, 0xa06c0b5d, 0xd4326d90, 0xd0f37027, 0xddb056fe,
- 0xd9714b49, 0xc7361b4c, 0xc3f706fb, 0xceb42022, 0xca753d95, 0xf23a8028, 0xf6fb9d9f, 0xfbb8bb46, 0xff79a6f1,
- 0xe13ef6f4, 0xe5ffeb43, 0xe8bccd9a, 0xec7dd02d, 0x34867077, 0x30476dc0, 0x3d044b19, 0x39c556ae, 0x278206ab,
- 0x23431b1c, 0x2e003dc5, 0x2ac12072, 0x128e9dcf, 0x164f8078, 0x1b0ca6a1, 0x1fcdbb16, 0x018aeb13, 0x054bf6a4,
- 0x0808d07d, 0x0cc9cdca, 0x7897ab07, 0x7c56b6b0, 0x71159069, 0x75d48dde, 0x6b93dddb, 0x6f52c06c, 0x6211e6b5,
- 0x66d0fb02, 0x5e9f46bf, 0x5a5e5b08, 0x571d7dd1, 0x53dc6066, 0x4d9b3063, 0x495a2dd4, 0x44190b0d, 0x40d816ba,
- 0xaca5c697, 0xa864db20, 0xa527fdf9, 0xa1e6e04e, 0xbfa1b04b, 0xbb60adfc, 0xb6238b25, 0xb2e29692, 0x8aad2b2f,
- 0x8e6c3698, 0x832f1041, 0x87ee0df6, 0x99a95df3, 0x9d684044, 0x902b669d, 0x94ea7b2a, 0xe0b41de7, 0xe4750050,
- 0xe9362689, 0xedf73b3e, 0xf3b06b3b, 0xf771768c, 0xfa325055, 0xfef34de2, 0xc6bcf05f, 0xc27dede8, 0xcf3ecb31,
- 0xcbffd686, 0xd5b88683, 0xd1799b34, 0xdc3abded, 0xd8fba05a, 0x690ce0ee, 0x6dcdfd59, 0x608edb80, 0x644fc637,
- 0x7a089632, 0x7ec98b85, 0x738aad5c, 0x774bb0eb, 0x4f040d56, 0x4bc510e1, 0x46863638, 0x42472b8f, 0x5c007b8a,
- 0x58c1663d, 0x558240e4, 0x51435d53, 0x251d3b9e, 0x21dc2629, 0x2c9f00f0, 0x285e1d47, 0x36194d42, 0x32d850f5,
- 0x3f9b762c, 0x3b5a6b9b, 0x0315d626, 0x07d4cb91, 0x0a97ed48, 0x0e56f0ff, 0x1011a0fa, 0x14d0bd4d, 0x19939b94,
- 0x1d528623, 0xf12f560e, 0xf5ee4bb9, 0xf8ad6d60, 0xfc6c70d7, 0xe22b20d2, 0xe6ea3d65, 0xeba91bbc, 0xef68060b,
- 0xd727bbb6, 0xd3e6a601, 0xdea580d8, 0xda649d6f, 0xc423cd6a, 0xc0e2d0dd, 0xcda1f604, 0xc960ebb3, 0xbd3e8d7e,
- 0xb9ff90c9, 0xb4bcb610, 0xb07daba7, 0xae3afba2, 0xaafbe615, 0xa7b8c0cc, 0xa379dd7b, 0x9b3660c6, 0x9ff77d71,
- 0x92b45ba8, 0x9675461f, 0x8832161a, 0x8cf30bad, 0x81b02d74, 0x857130c3, 0x5d8a9099, 0x594b8d2e, 0x5408abf7,
- 0x50c9b640, 0x4e8ee645, 0x4a4ffbf2, 0x470cdd2b, 0x43cdc09c, 0x7b827d21, 0x7f436096, 0x7200464f, 0x76c15bf8,
- 0x68860bfd, 0x6c47164a, 0x61043093, 0x65c52d24, 0x119b4be9, 0x155a565e, 0x18197087, 0x1cd86d30, 0x029f3d35,
- 0x065e2082, 0x0b1d065b, 0x0fdc1bec, 0x3793a651, 0x3352bbe6, 0x3e119d3f, 0x3ad08088, 0x2497d08d, 0x2056cd3a,
- 0x2d15ebe3, 0x29d4f654, 0xc5a92679, 0xc1683bce, 0xcc2b1d17, 0xc8ea00a0, 0xd6ad50a5, 0xd26c4d12, 0xdf2f6bcb,
- 0xdbee767c, 0xe3a1cbc1, 0xe760d676, 0xea23f0af, 0xeee2ed18, 0xf0a5bd1d, 0xf464a0aa, 0xf9278673, 0xfde69bc4,
- 0x89b8fd09, 0x8d79e0be, 0x803ac667, 0x84fbdbd0, 0x9abc8bd5, 0x9e7d9662, 0x933eb0bb, 0x97ffad0c, 0xafb010b1,
- 0xab710d06, 0xa6322bdf, 0xa2f33668, 0xbcb4666d, 0xb8757bda, 0xb5365d03, 0xb1f740b4 };
-
- public CRC() {
- initialiseCRC();
- }
-
- void initialiseCRC() {
- globalCrc = 0xffffffff;
- }
-
- int getFinalCRC() {
- return ~globalCrc;
- }
-
- void updateCRC(int inCh) {
- int temp = (globalCrc >> 24) ^ inCh;
- if (temp < 0) {
- temp = 256 + temp;
- }
- globalCrc = (globalCrc << 8) ^ CRC.crc32Table[temp];
- }
-
- int globalCrc;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/text/LineParser.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/text/LineParser.java b/crunch/src/main/java/org/apache/crunch/io/text/LineParser.java
deleted file mode 100644
index 9438014..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/text/LineParser.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io.text;
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.StringTokenizer;
-
-import org.apache.crunch.MapFn;
-import org.apache.crunch.Pair;
-import org.apache.crunch.fn.CompositeMapFn;
-import org.apache.crunch.fn.IdentityFn;
-import org.apache.crunch.types.PTableType;
-import org.apache.crunch.types.PType;
-
-import com.google.common.base.Splitter;
-import com.google.common.collect.ImmutableList;
-
-/**
- * An abstraction for parsing the lines of a text file using a {@code PType<T>} to
- * convert the lines of text into a given data type.
- *
- * @param <T> The type returned by the text parsing
- */
-abstract class LineParser<T> {
-
- public static <S> LineParser<S> forType(PType<S> ptype) {
- return new SimpleLineParser<S>(ptype);
- }
-
- public static <K, V> LineParser<Pair<K, V>> forTableType(PTableType<K, V> ptt, String sep) {
- return new KeyValueLineParser<K, V>(ptt, sep);
- }
-
- private MapFn<String, T> mapFn;
-
- public void initialize() {
- mapFn = getMapFn();
- mapFn.initialize();
- }
-
- public T parse(String line) {
- return mapFn.map(line);
- }
-
- protected abstract MapFn<String, T> getMapFn();
-
- private static <T> MapFn<String, T> getMapFnForPType(PType<T> ptype) {
- MapFn ret = null;
- if (String.class.equals(ptype.getTypeClass())) {
- ret = (MapFn) IdentityFn.getInstance();
- } else {
- // Check for a composite MapFn for the PType.
- // Note that this won't work for Avro-- need to solve that.
- ret = ptype.getInputMapFn();
- if (ret instanceof CompositeMapFn) {
- ret = ((CompositeMapFn) ret).getSecond();
- }
- }
- return ret;
- }
-
- private static class SimpleLineParser<S> extends LineParser<S> {
-
- private final PType<S> ptype;
-
- public SimpleLineParser(PType<S> ptype) {
- this.ptype = ptype;
- }
-
- @Override
- protected MapFn<String, S> getMapFn() {
- return getMapFnForPType(ptype);
- }
- }
-
- private static class KeyValueLineParser<K, V> extends LineParser<Pair<K, V>> {
-
- private final PTableType<K, V> ptt;
- private final String sep;
-
- public KeyValueLineParser(PTableType<K, V> ptt, String sep) {
- this.ptt = ptt;
- this.sep = sep;
- }
-
- @Override
- protected MapFn<String, Pair<K, V>> getMapFn() {
- final MapFn<String, K> keyMapFn = getMapFnForPType(ptt.getKeyType());
- final MapFn<String, V> valueMapFn = getMapFnForPType(ptt.getValueType());
-
- return new MapFn<String, Pair<K, V>>() {
- @Override
- public void initialize() {
- keyMapFn.initialize();
- valueMapFn.initialize();
- }
-
- @Override
- public Pair<K, V> map(String input) {
- List<String> kv = ImmutableList.copyOf(Splitter.on(sep).limit(1).split(input));
- if (kv.size() != 2) {
- throw new RuntimeException("Invalid input string: " + input);
- }
- return Pair.of(keyMapFn.map(kv.get(0)), valueMapFn.map(kv.get(1)));
- }
- };
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/text/NLineFileSource.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/text/NLineFileSource.java b/crunch/src/main/java/org/apache/crunch/io/text/NLineFileSource.java
deleted file mode 100644
index 40e2dbd..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/text/NLineFileSource.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io.text;
-
-import java.io.IOException;
-
-import org.apache.crunch.io.CompositePathIterable;
-import org.apache.crunch.io.FormatBundle;
-import org.apache.crunch.io.ReadableSource;
-import org.apache.crunch.io.impl.FileSourceImpl;
-import org.apache.crunch.types.PType;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
-
-/**
- * A {@code Source} instance that uses the {@code NLineInputFormat}, which gives each map
- * task a fraction of the lines in a text file as input. Most useful when running simulations
- * on Hadoop, where each line represents configuration information about each simulation
- * run.
- */
-public class NLineFileSource<T> extends FileSourceImpl<T> implements ReadableSource<T> {
-
- private static FormatBundle getBundle(int linesPerTask) {
- FormatBundle bundle = FormatBundle.forInput(NLineInputFormat.class);
- bundle.set(NLineInputFormat.LINES_PER_MAP, String.valueOf(linesPerTask));
- return bundle;
- }
-
- /**
- * Create a new {@code NLineFileSource} instance.
- *
- * @param path The path to the input data, as a String
- * @param ptype The PType to use for processing the data
- * @param linesPerTask The number of lines from the input each map task will process
- */
- public NLineFileSource(String path, PType<T> ptype, int linesPerTask) {
- this(new Path(path), ptype, linesPerTask);
- }
-
- /**
- * Create a new {@code NLineFileSource} instance.
- *
- * @param path The {@code Path} to the input data
- * @param ptype The PType to use for processing the data
- * @param linesPerTask The number of lines from the input each map task will process
- */
- public NLineFileSource(Path path, PType<T> ptype, int linesPerTask) {
- super(path, ptype, getBundle(linesPerTask));
- }
-
- @Override
- public String toString() {
- return "NLine(" + path + ")";
- }
-
- @Override
- public Iterable<T> read(Configuration conf) throws IOException {
- return CompositePathIterable.create(path.getFileSystem(conf), path,
- new TextFileReaderFactory<T>(LineParser.forType(ptype)));
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java b/crunch/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java
deleted file mode 100644
index e1fea6e..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io.text;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.Iterator;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.crunch.io.FileReaderFactory;
-import org.apache.crunch.io.impl.AutoClosingIterator;
-import org.apache.crunch.types.PType;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import com.google.common.collect.Iterators;
-import com.google.common.collect.UnmodifiableIterator;
-
-public class TextFileReaderFactory<T> implements FileReaderFactory<T> {
-
- private static final Log LOG = LogFactory.getLog(TextFileReaderFactory.class);
-
- private final LineParser<T> parser;
-
- public TextFileReaderFactory(PType<T> ptype) {
- this(LineParser.forType(ptype));
- }
-
- public TextFileReaderFactory(LineParser<T> parser) {
- this.parser = parser;
- }
-
- @Override
- public Iterator<T> read(FileSystem fs, Path path) {
- parser.initialize();
-
- FSDataInputStream is;
- try {
- is = fs.open(path);
- } catch (IOException e) {
- LOG.info("Could not read path: " + path, e);
- return Iterators.emptyIterator();
- }
-
- final BufferedReader reader = new BufferedReader(new InputStreamReader(is));
- return new AutoClosingIterator<T>(reader, new UnmodifiableIterator<T>() {
- private String nextLine;
-
- @Override
- public boolean hasNext() {
- try {
- return (nextLine = reader.readLine()) != null;
- } catch (IOException e) {
- LOG.info("Exception reading text file stream", e);
- return false;
- }
- }
-
- @Override
- public T next() {
- return parser.parse(nextLine);
- }
- });
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/text/TextFileSource.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/text/TextFileSource.java b/crunch/src/main/java/org/apache/crunch/io/text/TextFileSource.java
deleted file mode 100644
index 026fca9..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/text/TextFileSource.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io.text;
-
-import java.io.IOException;
-
-import org.apache.crunch.io.CompositePathIterable;
-import org.apache.crunch.io.ReadableSource;
-import org.apache.crunch.io.impl.FileSourceImpl;
-import org.apache.crunch.types.PType;
-import org.apache.crunch.types.avro.AvroTypeFamily;
-import org.apache.crunch.types.avro.AvroUtf8InputFormat;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-
-public class TextFileSource<T> extends FileSourceImpl<T> implements ReadableSource<T> {
-
- private static boolean isBZip2(Path path) {
- String strPath = path.toString();
- return strPath.endsWith(".bz") || strPath.endsWith(".bz2");
- }
-
- private static <S> Class<? extends FileInputFormat<?, ?>> getInputFormat(Path path, PType<S> ptype) {
- if (ptype.getFamily().equals(AvroTypeFamily.getInstance())) {
- return AvroUtf8InputFormat.class;
- } else if (isBZip2(path)) {
- return BZip2TextInputFormat.class;
- } else {
- return TextInputFormat.class;
- }
- }
-
- public TextFileSource(Path path, PType<T> ptype) {
- super(path, ptype, getInputFormat(path, ptype));
- }
-
- @Override
- public long getSize(Configuration conf) {
- long sz = super.getSize(conf);
- if (isBZip2(path)) {
- sz *= 10; // Arbitrary compression factor
- }
- return sz;
- }
-
- @Override
- public String toString() {
- return "Text(" + path + ")";
- }
-
- @Override
- public Iterable<T> read(Configuration conf) throws IOException {
- return CompositePathIterable.create(path.getFileSystem(conf), path,
- new TextFileReaderFactory<T>(LineParser.forType(ptype)));
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/text/TextFileSourceTarget.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/text/TextFileSourceTarget.java b/crunch/src/main/java/org/apache/crunch/io/text/TextFileSourceTarget.java
deleted file mode 100644
index 1d1211e..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/text/TextFileSourceTarget.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io.text;
-
-import org.apache.crunch.io.FileNamingScheme;
-import org.apache.crunch.io.SequentialFileNamingScheme;
-import org.apache.crunch.io.impl.ReadableSourcePathTargetImpl;
-import org.apache.crunch.types.PType;
-import org.apache.hadoop.fs.Path;
-
-public class TextFileSourceTarget<T> extends ReadableSourcePathTargetImpl<T> {
-
- public TextFileSourceTarget(String path, PType<T> ptype) {
- this(new Path(path), ptype);
- }
-
- public TextFileSourceTarget(Path path, PType<T> ptype) {
- this(path, ptype, new SequentialFileNamingScheme());
- }
-
- public TextFileSourceTarget(Path path, PType<T> ptype, FileNamingScheme fileNamingScheme) {
- super(new TextFileSource<T>(path, ptype), new TextFileTarget(path), fileNamingScheme);
- }
-
- @Override
- public String toString() {
- return target.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/text/TextFileTableSource.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/text/TextFileTableSource.java b/crunch/src/main/java/org/apache/crunch/io/text/TextFileTableSource.java
deleted file mode 100644
index 94fc5fd..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/text/TextFileTableSource.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io.text;
-
-import java.io.IOException;
-
-import org.apache.crunch.Pair;
-import org.apache.crunch.io.CompositePathIterable;
-import org.apache.crunch.io.FormatBundle;
-import org.apache.crunch.io.ReadableSource;
-import org.apache.crunch.io.impl.FileTableSourceImpl;
-import org.apache.crunch.types.PTableType;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
-
-/**
- * A {@code Source} that uses the {@code KeyValueTextInputFormat} to process
- * input text. If a separator for the keys and values in the text file is not specified,
- * a tab character is used.
- */
-public class TextFileTableSource<K, V> extends FileTableSourceImpl<K, V>
- implements ReadableSource<Pair<K, V>> {
-
- // CRUNCH-125: Maintain compatibility with both versions of the KeyValueTextInputFormat's
- // configuration field for specifying the separator character.
- private static final String OLD_KV_SEP = "key.value.separator.in.input.line";
- private static final String NEW_KV_SEP = "mapreduce.input.keyvaluelinerecordreader.key.value.separator";
-
- private static FormatBundle getBundle(String sep) {
- FormatBundle bundle = FormatBundle.forInput(KeyValueTextInputFormat.class);
- bundle.set(OLD_KV_SEP, sep);
- bundle.set(NEW_KV_SEP, sep);
- return bundle;
- }
-
- private final String separator;
-
- public TextFileTableSource(String path, PTableType<K, V> tableType) {
- this(new Path(path), tableType);
- }
-
- public TextFileTableSource(Path path, PTableType<K, V> tableType) {
- this(path, tableType, "\t");
- }
-
- public TextFileTableSource(String path, PTableType<K, V> tableType, String separator) {
- this(new Path(path), tableType, separator);
- }
-
- public TextFileTableSource(Path path, PTableType<K, V> tableType, String separator) {
- super(path, tableType, getBundle(separator));
- this.separator = separator;
- }
-
- @Override
- public String toString() {
- return "KeyValueText(" + path + ")";
- }
-
- @Override
- public Iterable<Pair<K, V>> read(Configuration conf) throws IOException {
- return CompositePathIterable.create(path.getFileSystem(conf), path,
- new TextFileReaderFactory<Pair<K, V>>(LineParser.forTableType(getTableType(), separator)));
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/text/TextFileTableSourceTarget.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/text/TextFileTableSourceTarget.java b/crunch/src/main/java/org/apache/crunch/io/text/TextFileTableSourceTarget.java
deleted file mode 100644
index dec97e5..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/text/TextFileTableSourceTarget.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io.text;
-
-import org.apache.crunch.Pair;
-import org.apache.crunch.TableSourceTarget;
-import org.apache.crunch.io.FileNamingScheme;
-import org.apache.crunch.io.SequentialFileNamingScheme;
-import org.apache.crunch.io.impl.ReadableSourcePathTargetImpl;
-import org.apache.crunch.types.PTableType;
-import org.apache.hadoop.fs.Path;
-
-/**
- * A {@code TableSource} and {@code SourceTarget} implementation that uses the
- * {@code KeyValueTextInputFormat} and {@code TextOutputFormat} to support reading
- * and writing text files as {@code PTable} instances using a tab separator for
- * the keys and the values.
- */
-public class TextFileTableSourceTarget<K, V> extends ReadableSourcePathTargetImpl<Pair<K, V>> implements
- TableSourceTarget<K, V> {
-
- private final PTableType<K, V> tableType;
-
- public TextFileTableSourceTarget(String path, PTableType<K, V> tableType) {
- this(new Path(path), tableType);
- }
-
- public TextFileTableSourceTarget(Path path, PTableType<K, V> tableType) {
- this(path, tableType, new SequentialFileNamingScheme());
- }
-
- public TextFileTableSourceTarget(Path path, PTableType<K, V> tableType,
- FileNamingScheme fileNamingScheme) {
- super(new TextFileTableSource<K, V>(path, tableType), new TextFileTarget(path),
- fileNamingScheme);
- this.tableType = tableType;
- }
-
- @Override
- public PTableType<K, V> getTableType() {
- return tableType;
- }
-
- @Override
- public String toString() {
- return target.toString();
- }
-}