You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ve...@apache.org on 2015/02/06 02:18:27 UTC
[3/3] drill git commit: DRILL-2080: Add IO Stats for JSON/Parquet
type input files
DRILL-2080: Add IO Stats for JSON/Parquet type input files
Refactoring: Remove FileSystem shim loader
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/c54bd6ac
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/c54bd6ac
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/c54bd6ac
Branch: refs/heads/master
Commit: c54bd6acb81daa9e1c3ce358e9d3a4ce909f02aa
Parents: 8e8b181
Author: vkorukanti <ve...@gmail.com>
Authored: Mon Dec 15 08:59:25 2014 -0800
Committer: vkorukanti <ve...@gmail.com>
Committed: Thu Feb 5 15:12:37 2015 -0800
----------------------------------------------------------------------
.../drill/exec/dotdrill/DotDrillFile.java | 9 +-
.../drill/exec/dotdrill/DotDrillUtil.java | 6 +-
.../drill/exec/physical/impl/ScanBatch.java | 49 +-
.../planner/sql/handlers/ShowFileHandler.java | 5 +-
.../exec/store/dfs/BasicFormatMatcher.java | 5 +-
.../exec/store/dfs/DrillFSDataInputStream.java | 176 +++++
.../drill/exec/store/dfs/DrillFileSystem.java | 682 +++++++++++++++++++
.../drill/exec/store/dfs/FileSelection.java | 4 +-
.../drill/exec/store/dfs/FileSystemPlugin.java | 4 +-
.../drill/exec/store/dfs/FormatCreator.java | 1 -
.../drill/exec/store/dfs/FormatPlugin.java | 1 -
.../exec/store/dfs/WorkspaceSchemaFactory.java | 14 +-
.../exec/store/dfs/easy/EasyFormatPlugin.java | 15 +-
.../exec/store/dfs/easy/EasyGroupScan.java | 2 +-
.../exec/store/dfs/shim/DrillFileSystem.java | 44 --
.../exec/store/dfs/shim/DrillInputStream.java | 32 -
.../exec/store/dfs/shim/DrillOutputStream.java | 29 -
.../exec/store/dfs/shim/FileSystemCreator.java | 35 -
.../dfs/shim/fallback/FallbackFileSystem.java | 159 -----
.../exec/store/easy/json/JSONFormatPlugin.java | 6 +-
.../exec/store/easy/json/JSONRecordReader.java | 5 +-
.../exec/store/easy/text/TextFormatPlugin.java | 6 +-
.../exec/store/parquet/ParquetFormatPlugin.java | 10 +-
.../exec/store/parquet/ParquetGroupScan.java | 4 +-
.../store/parquet/ParquetScanBatchCreator.java | 11 +-
.../exec/store/parquet2/DrillParquetReader.java | 13 +-
.../drill/exec/store/sys/local/FilePStore.java | 20 +-
.../store/sys/local/LocalPStoreProvider.java | 3 +-
.../drill/exec/store/sys/zk/ZkPStore.java | 10 -
.../exec/store/sys/zk/ZkPStoreProvider.java | 2 +-
.../exec/store/dfs/TestDrillFileSystem.java | 91 +++
31 files changed, 1048 insertions(+), 405 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/c54bd6ac/exec/java-exec/src/main/java/org/apache/drill/exec/dotdrill/DotDrillFile.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/dotdrill/DotDrillFile.java b/exec/java-exec/src/main/java/org/apache/drill/exec/dotdrill/DotDrillFile.java
index 009cd00..6a5934b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/dotdrill/DotDrillFile.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/dotdrill/DotDrillFile.java
@@ -18,12 +18,13 @@
package org.apache.drill.exec.dotdrill;
import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
-import org.apache.drill.exec.store.dfs.shim.DrillInputStream;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.hadoop.fs.FileStatus;
import com.google.common.base.Preconditions;
+import java.io.InputStream;
+
public class DotDrillFile {
private FileStatus status;
@@ -51,8 +52,8 @@ public class DotDrillFile {
public View getView(DrillConfig config) throws Exception{
Preconditions.checkArgument(type == DotDrillType.VIEW);
- try(DrillInputStream is = fs.open(status.getPath())){
- return config.getMapper().readValue(is.getInputStream(), View.class);
+ try(InputStream is = fs.open(status.getPath())){
+ return config.getMapper().readValue(is, View.class);
}
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/c54bd6ac/exec/java-exec/src/main/java/org/apache/drill/exec/dotdrill/DotDrillUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/dotdrill/DotDrillUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/dotdrill/DotDrillUtil.java
index 63b22e9..cef4359 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/dotdrill/DotDrillUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/dotdrill/DotDrillUtil.java
@@ -20,7 +20,7 @@ package org.apache.drill.exec.dotdrill;
import java.io.IOException;
import java.util.List;
-import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
@@ -50,7 +50,7 @@ public class DotDrillUtil {
}
public static List<DotDrillFile> getDotDrills(DrillFileSystem fs, Path root, DotDrillType... types) throws IOException{
- return getDrillFiles(fs, fs.getUnderlying().globStatus(new Path(root, "*.drill")), types);
+ return getDrillFiles(fs, fs.globStatus(new Path(root, "*.drill")), types);
}
public static List<DotDrillFile> getDotDrills(DrillFileSystem fs, Path root, String name, DotDrillType... types) throws IOException{
@@ -58,6 +58,6 @@ public class DotDrillUtil {
name = name + DotDrillType.DOT_DRILL_GLOB;
}
- return getDrillFiles(fs, fs.getUnderlying().globStatus(new Path(root, name)), types);
+ return getDrillFiles(fs, fs.globStatus(new Path(root, name)), types);
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/c54bd6ac/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index d68a5b5..6b7294d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -83,17 +83,23 @@ public class ScanBatch implements RecordBatch {
private boolean done = false;
private SchemaChangeCallBack callBack = new SchemaChangeCallBack();
- public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context, Iterator<RecordReader> readers, List<String[]> partitionColumns, List<Integer> selectedPartitionColumns) throws ExecutionSetupException {
+ public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context, OperatorContext oContext,
+ Iterator<RecordReader> readers, List<String[]> partitionColumns, List<Integer> selectedPartitionColumns) throws ExecutionSetupException {
this.context = context;
this.readers = readers;
if (!readers.hasNext()) {
throw new ExecutionSetupException("A scan batch must contain at least one reader.");
}
this.currentReader = readers.next();
- // Scan Batch is not subject to fragment memory limit
- this.oContext = new OperatorContext(subScanConfig, context, false);
+ this.oContext = oContext;
this.currentReader.setOperatorContext(this.oContext);
- this.currentReader.setup(mutator);
+
+ try {
+ oContext.getStats().startProcessing();
+ this.currentReader.setup(mutator);
+ } finally {
+ oContext.getStats().stopProcessing();
+ }
this.partitionColumns = partitionColumns.iterator();
this.partitionValues = this.partitionColumns.hasNext() ? this.partitionColumns.next() : null;
this.selectedPartitionColumns = selectedPartitionColumns;
@@ -103,7 +109,9 @@ public class ScanBatch implements RecordBatch {
}
public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context, Iterator<RecordReader> readers) throws ExecutionSetupException {
- this(subScanConfig, context, readers, Collections.<String[]> emptyList(), Collections.<Integer> emptyList());
+ this(subScanConfig, context,
+ new OperatorContext(subScanConfig, context, false /* ScanBatch is not subject to fragment memory limit */),
+ readers, Collections.<String[]> emptyList(), Collections.<Integer> emptyList());
}
public FragmentContext getContext() {
@@ -166,26 +174,23 @@ public class ScanBatch implements RecordBatch {
}
return IterOutcome.NONE;
}
- oContext.getStats().startSetup();
+
+ currentReader.cleanup();
+ currentReader = readers.next();
+ partitionValues = partitionColumns.hasNext() ? partitionColumns.next() : null;
+ currentReader.setup(mutator);
+ currentReader.setOperatorContext(oContext);
try {
- currentReader.cleanup();
- currentReader = readers.next();
- partitionValues = partitionColumns.hasNext() ? partitionColumns.next() : null;
- currentReader.setup(mutator);
- currentReader.setOperatorContext(oContext);
- try {
- currentReader.allocate(fieldVectorMap);
- } catch (OutOfMemoryException e) {
- logger.debug("Caught OutOfMemoryException");
- for (ValueVector v : fieldVectorMap.values()) {
- v.clear();
- }
- return IterOutcome.OUT_OF_MEMORY;
+ currentReader.allocate(fieldVectorMap);
+ } catch (OutOfMemoryException e) {
+ logger.debug("Caught OutOfMemoryException");
+ for (ValueVector v : fieldVectorMap.values()) {
+ v.clear();
}
- addPartitionVectors();
- } finally {
- oContext.getStats().stopSetup();
+ return IterOutcome.OUT_OF_MEMORY;
}
+ addPartitionVectors();
+
} catch (ExecutionSetupException e) {
this.context.fail(e);
releaseAssets();
http://git-wip-us.apache.org/repos/asf/drill/blob/c54bd6ac/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFileHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFileHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFileHandler.java
index ff3542d..2504ed9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFileHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFileHandler.java
@@ -22,20 +22,17 @@ import java.util.ArrayList;
import java.util.List;
import net.hydromatic.optiq.SchemaPlus;
-import net.hydromatic.optiq.tools.Planner;
import net.hydromatic.optiq.tools.RelConversionException;
import net.hydromatic.optiq.tools.ValidationException;
-import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.planner.sql.DirectPlan;
import org.apache.drill.exec.planner.sql.parser.SqlShowFiles;
import org.apache.drill.exec.store.AbstractSchema;
import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory.WorkspaceSchema;
-import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
-import org.eigenbase.relopt.hep.HepPlanner;
import org.eigenbase.sql.SqlIdentifier;
import org.eigenbase.sql.SqlNode;
http://git-wip-us.apache.org/repos/asf/drill/blob/c54bd6ac/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BasicFormatMatcher.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BasicFormatMatcher.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BasicFormatMatcher.java
index 2ba2910..9756f3c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BasicFormatMatcher.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BasicFormatMatcher.java
@@ -23,7 +23,6 @@ import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;
-import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.io.compress.CompressionCodec;
@@ -64,7 +63,7 @@ public class BasicFormatMatcher extends FormatMatcher{
this.fs = fs;
this.plugin = plugin;
this.compressible = compressible;
- this.codecFactory = new CompressionCodecFactory(fs.getUnderlying().getConf());
+ this.codecFactory = new CompressionCodecFactory(fs.getConf());
}
@Override
@@ -135,7 +134,7 @@ public class BasicFormatMatcher extends FormatMatcher{
}
final Range<Long> fileRange = Range.closedOpen( 0L, status.getLen());
- try (FSDataInputStream is = fs.open(status.getPath()).getInputStream()) {
+ try (FSDataInputStream is = fs.open(status.getPath())) {
for(RangeMagics rMagic : ranges) {
Range<Long> r = rMagic.range;
if (!fileRange.encloses(r)) {
http://git-wip-us.apache.org/repos/asf/drill/blob/c54bd6ac/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFSDataInputStream.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFSDataInputStream.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFSDataInputStream.java
new file mode 100644
index 0000000..44ef8a3
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFSDataInputStream.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.dfs;
+
+import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.PositionedReadable;
+import org.apache.hadoop.fs.ReadOption;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.io.ByteBufferPool;
+
+import java.io.FileDescriptor;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.EnumSet;
+
+
+/**
+ * Wrapper around FSDataInputStream to collect IO Stats.
+ */
+public class DrillFSDataInputStream extends FSDataInputStream {
+ private FSDataInputStream underlyingIs;
+
+ public DrillFSDataInputStream(FSDataInputStream in, OperatorStats operatorStats) throws IOException {
+ super(new WrappedInputStream(in, operatorStats));
+ this.underlyingIs = in;
+ }
+
+ @Override
+ public synchronized void seek(long desired) throws IOException {
+ underlyingIs.seek(desired);
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return underlyingIs.getPos();
+ }
+
+ @Override
+ public int read(long position, byte[] buffer, int offset, int length) throws IOException {
+ return underlyingIs.read(position, buffer, offset, length);
+ }
+
+ @Override
+ public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
+ underlyingIs.readFully(position, buffer, offset, length);
+ }
+
+ @Override
+ public void readFully(long position, byte[] buffer) throws IOException {
+ underlyingIs.readFully(position, buffer);
+ }
+
+ @Override
+ public boolean seekToNewSource(long targetPos) throws IOException {
+ return underlyingIs.seekToNewSource(targetPos);
+ }
+
+ @Override
+ public InputStream getWrappedStream() {
+ return underlyingIs.getWrappedStream();
+ }
+
+ @Override
+ public int read(ByteBuffer buf) throws IOException {
+ return underlyingIs.read(buf);
+ }
+
+ @Override
+ public FileDescriptor getFileDescriptor() throws IOException {
+ return underlyingIs.getFileDescriptor();
+ }
+
+ @Override
+ public void setReadahead(Long readahead) throws IOException, UnsupportedOperationException {
+ underlyingIs.setReadahead(readahead);
+ }
+
+ @Override
+ public void setDropBehind(Boolean dropBehind) throws IOException, UnsupportedOperationException {
+ underlyingIs.setDropBehind(dropBehind);
+ }
+
+ @Override
+ public ByteBuffer read(ByteBufferPool bufferPool, int maxLength, EnumSet<ReadOption> opts) throws IOException, UnsupportedOperationException {
+ return underlyingIs.read(bufferPool, maxLength, opts);
+ }
+
+ @Override
+ public void releaseBuffer(ByteBuffer buffer) {
+ underlyingIs.releaseBuffer(buffer);
+ }
+
+ /**
+ * We need to wrap the FSDataInputStream inside a InputStream, because read() method in InputStream is
+ * overridden in FilterInputStream (super class of FSDataInputStream) as final, so we can not override in
+ * DrillFSDataInputStream.
+ */
+ private static class WrappedInputStream extends InputStream implements Seekable, PositionedReadable {
+ final FSDataInputStream is;
+ final OperatorStats operatorStats;
+
+ WrappedInputStream(FSDataInputStream is, OperatorStats operatorStats) {
+ this.is = is;
+ this.operatorStats = operatorStats;
+ }
+
+ /**
+ * Most of the read are going to be block reads which use {@link #read(byte[], int,
+ * int)}. So not adding stats for single byte reads.
+ */
+ @Override
+ public int read() throws IOException {
+ return is.read();
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ operatorStats.startWait();
+ int numBytesRead;
+ try {
+ numBytesRead = is.read(b, off, len);
+ } finally {
+ operatorStats.stopWait();
+ }
+
+ return numBytesRead;
+ }
+
+ @Override
+ public int read(long position, byte[] buffer, int offset, int length) throws IOException {
+ return is.read(position, buffer, offset, length);
+ }
+
+ @Override
+ public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
+ is.readFully(position, buffer, offset, length);
+ }
+
+ @Override
+ public void readFully(long position, byte[] buffer) throws IOException {
+ is.readFully(position, buffer);
+ }
+
+ @Override
+ public void seek(long pos) throws IOException {
+ is.seek(pos);
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return is.getPos();
+ }
+
+ @Override
+ public boolean seekToNewSource(long targetPos) throws IOException {
+ return is.seekToNewSource(targetPos);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/c54bd6ac/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
new file mode 100644
index 0000000..f5730a1
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
@@ -0,0 +1,682 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.dfs;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.EnumSet;
+import java.util.List;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileChecksum;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsServerDefaults;
+import org.apache.hadoop.fs.FsStatus;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Options.ChecksumOpt;
+import org.apache.hadoop.fs.ParentNotDirectoryException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclStatus;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * DrillFileSystem is the wrapper around the actual FileSystem implementation.
+ *
+ * If {@link org.apache.drill.exec.ops.OperatorStats} are provided it returns a instrumented FSDataInputStream to
+ * measure IO wait time.
+ */
+public class DrillFileSystem extends FileSystem {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillFileSystem.class);
+
+ private final FileSystem underlyingFs;
+ private final OperatorStats operatorStats;
+
+ public DrillFileSystem(Configuration fsConf) throws IOException {
+ this.underlyingFs = FileSystem.get(fsConf);
+ this.operatorStats = null;
+ }
+
+ public DrillFileSystem(FileSystem fs, OperatorStats operatorStats) {
+ this.underlyingFs = fs;
+ this.operatorStats = operatorStats;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ // Guard against setConf(null) call that is called as part of superclass constructor (Configured) of the
+ // DrillFileSystem, at which point underlyingFs is null.
+ if (conf != null && underlyingFs != null) {
+ underlyingFs.setConf(conf);
+ }
+ }
+
+ @Override
+ public Configuration getConf() {
+ return underlyingFs.getConf();
+ }
+
+ /**
+ * If OperatorStats are provided return a instrumented {@link org.apache.hadoop.fs.FSDataInputStream}.
+ */
+ @Override
+ public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+ if (operatorStats != null) {
+ return new DrillFSDataInputStream(underlyingFs.open(f, bufferSize), operatorStats);
+ }
+
+ return underlyingFs.open(f, bufferSize);
+ }
+
+ /**
+ * If OperatorStats are provided return a instrumented {@link org.apache.hadoop.fs.FSDataInputStream}.
+ */
+ @Override
+ public FSDataInputStream open(Path f) throws IOException {
+ if (operatorStats != null) {
+ return new DrillFSDataInputStream(underlyingFs.open(f), operatorStats);
+ }
+
+ return underlyingFs.open(f);
+ }
+
+ @Override
+ public void initialize(URI name, Configuration conf) throws IOException {
+ underlyingFs.initialize(name, conf);
+ }
+
+ @Override
+ public String getScheme() {
+ return underlyingFs.getScheme();
+ }
+
+ @Override
+ public FSDataOutputStream create(Path f) throws IOException {
+ return underlyingFs.create(f);
+ }
+
+ @Override
+ public FSDataOutputStream create(Path f, boolean overwrite) throws IOException {
+ return underlyingFs.create(f, overwrite);
+ }
+
+ @Override
+ public FSDataOutputStream create(Path f, Progressable progress) throws IOException {
+ return underlyingFs.create(f, progress);
+ }
+
+ @Override
+ public FSDataOutputStream create(Path f, short replication) throws IOException {
+ return underlyingFs.create(f, replication);
+ }
+
+ @Override
+ public FSDataOutputStream create(Path f, short replication, Progressable progress) throws IOException {
+ return underlyingFs.create(f, replication, progress);
+ }
+
+ @Override
+ public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize) throws IOException {
+ return underlyingFs.create(f, overwrite, bufferSize);
+ }
+
+ @Override
+ public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, Progressable progress) throws IOException {
+ return underlyingFs.create(f, overwrite, bufferSize, progress);
+ }
+
+ @Override
+ public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication,
+ long blockSize) throws IOException {
+ return underlyingFs.create(f, overwrite, bufferSize, replication, blockSize);
+ }
+
+ @Override
+ public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException {
+ return underlyingFs.create(f, overwrite, bufferSize, replication, blockSize, progress);
+ }
+
+ @Override
+ public FileStatus getFileStatus(Path f) throws IOException {
+ return underlyingFs.getFileStatus(f);
+ }
+
+ @Override
+ public void createSymlink(Path target, Path link, boolean createParent) throws AccessControlException, FileAlreadyExistsException, FileNotFoundException, ParentNotDirectoryException, UnsupportedFileSystemException, IOException {
+ underlyingFs.createSymlink(target, link, createParent);
+ }
+
+ @Override
+ public FileStatus getFileLinkStatus(Path f) throws AccessControlException, FileNotFoundException,
+ UnsupportedFileSystemException, IOException {
+ return underlyingFs.getFileLinkStatus(f);
+ }
+
+ @Override
+ public boolean supportsSymlinks() {
+ return underlyingFs.supportsSymlinks();
+ }
+
+ @Override
+ public Path getLinkTarget(Path f) throws IOException {
+ return underlyingFs.getLinkTarget(f);
+ }
+
+ @Override
+ public FileChecksum getFileChecksum(Path f) throws IOException {
+ return underlyingFs.getFileChecksum(f);
+ }
+
+ @Override
+ public void setVerifyChecksum(boolean verifyChecksum) {
+ underlyingFs.setVerifyChecksum(verifyChecksum);
+ }
+
+ @Override
+ public void setWriteChecksum(boolean writeChecksum) {
+ underlyingFs.setWriteChecksum(writeChecksum);
+ }
+
+ @Override
+ public FsStatus getStatus() throws IOException {
+ return underlyingFs.getStatus();
+ }
+
+ @Override
+ public FsStatus getStatus(Path p) throws IOException {
+ return underlyingFs.getStatus(p);
+ }
+
+ @Override
+ public void setPermission(Path p, FsPermission permission) throws IOException {
+ underlyingFs.setPermission(p, permission);
+ }
+
+ @Override
+ public void setOwner(Path p, String username, String groupname) throws IOException {
+ underlyingFs.setOwner(p, username, groupname);
+ }
+
+ @Override
+ public void setTimes(Path p, long mtime, long atime) throws IOException {
+ underlyingFs.setTimes(p, mtime, atime);
+ }
+
+ @Override
+ public Path createSnapshot(Path path, String snapshotName) throws IOException {
+ return underlyingFs.createSnapshot(path, snapshotName);
+ }
+
+ @Override
+ public void renameSnapshot(Path path, String snapshotOldName, String snapshotNewName) throws IOException {
+ underlyingFs.renameSnapshot(path, snapshotOldName, snapshotNewName);
+ }
+
+ @Override
+ public void deleteSnapshot(Path path, String snapshotName) throws IOException {
+ underlyingFs.deleteSnapshot(path, snapshotName);
+ }
+
+ @Override
+ public void modifyAclEntries(Path path, List<AclEntry> aclSpec) throws IOException {
+ underlyingFs.modifyAclEntries(path, aclSpec);
+ }
+
+ @Override
+ public void removeAclEntries(Path path, List<AclEntry> aclSpec) throws IOException {
+ underlyingFs.removeAclEntries(path, aclSpec);
+ }
+
+ @Override
+ public void removeDefaultAcl(Path path) throws IOException {
+ underlyingFs.removeDefaultAcl(path);
+ }
+
+ @Override
+ public void removeAcl(Path path) throws IOException {
+ underlyingFs.removeAcl(path);
+ }
+
+ @Override
+ public void setAcl(Path path, List<AclEntry> aclSpec) throws IOException {
+ underlyingFs.setAcl(path, aclSpec);
+ }
+
+ @Override
+ public AclStatus getAclStatus(Path path) throws IOException {
+ return underlyingFs.getAclStatus(path);
+ }
+
+ @Override
+ public Path getWorkingDirectory() {
+ return underlyingFs.getWorkingDirectory();
+ }
+
+ @Override
+ public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException {
+ return underlyingFs.append(f, bufferSize, progress);
+ }
+
+ @Override
+ public void concat(Path trg, Path[] psrcs) throws IOException {
+ underlyingFs.concat(trg, psrcs);
+ }
+
+ @Override
+ @Deprecated
+ public short getReplication(Path src) throws IOException {
+ return underlyingFs.getReplication(src);
+ }
+
+ @Override
+ public boolean setReplication(Path src, short replication) throws IOException {
+ return underlyingFs.setReplication(src, replication);
+ }
+
+ @Override
+ public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+ return underlyingFs.mkdirs(f, permission);
+ }
+
+ @Override
+ public void copyFromLocalFile(Path src, Path dst) throws IOException {
+ underlyingFs.copyFromLocalFile(src, dst);
+ }
+
+ @Override
+ public void moveFromLocalFile(Path[] srcs, Path dst) throws IOException {
+ underlyingFs.moveFromLocalFile(srcs, dst);
+ }
+
+ @Override
+ public void moveFromLocalFile(Path src, Path dst) throws IOException {
+ underlyingFs.moveFromLocalFile(src, dst);
+ }
+
+ @Override
+ public void copyFromLocalFile(boolean delSrc, Path src, Path dst) throws IOException {
+ underlyingFs.copyFromLocalFile(delSrc, src, dst);
+ }
+
+ @Override
+ public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path[] srcs, Path dst) throws IOException {
+ underlyingFs.copyFromLocalFile(delSrc, overwrite, srcs, dst);
+ }
+
+ @Override
+ public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, Path dst) throws IOException {
+ underlyingFs.copyFromLocalFile(delSrc, overwrite, src, dst);
+ }
+
+ @Override
+ public void copyToLocalFile(Path src, Path dst) throws IOException {
+ underlyingFs.copyToLocalFile(src, dst);
+ }
+
+ @Override
+ public void moveToLocalFile(Path src, Path dst) throws IOException {
+ underlyingFs.moveToLocalFile(src, dst);
+ }
+
+ @Override
+ public void copyToLocalFile(boolean delSrc, Path src, Path dst) throws IOException {
+ underlyingFs.copyToLocalFile(delSrc, src, dst);
+ }
+
+ @Override
+ public void copyToLocalFile(boolean delSrc, Path src, Path dst, boolean useRawLocalFileSystem) throws IOException {
+ underlyingFs.copyToLocalFile(delSrc, src, dst, useRawLocalFileSystem);
+ }
+
+ @Override
+ public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile) throws IOException {
+ return underlyingFs.startLocalOutput(fsOutputFile, tmpLocalFile);
+ }
+
+ @Override
+ public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile) throws IOException {
+ underlyingFs.completeLocalOutput(fsOutputFile, tmpLocalFile);
+ }
+
+ @Override
+ public void close() throws IOException {
+ underlyingFs.close();
+ }
+
+ @Override
+ public long getUsed() throws IOException {
+ return underlyingFs.getUsed();
+ }
+
+ @Override
+ @Deprecated
+ public long getBlockSize(Path f) throws IOException {
+ return underlyingFs.getBlockSize(f);
+ }
+
+ @Override
+ @Deprecated
+ public long getDefaultBlockSize() {
+ return underlyingFs.getDefaultBlockSize();
+ }
+
+ @Override
+ public long getDefaultBlockSize(Path f) {
+ return underlyingFs.getDefaultBlockSize(f);
+ }
+
+ @Override
+ @Deprecated
+ public short getDefaultReplication() {
+ return underlyingFs.getDefaultReplication();
+ }
+
+ @Override
+ public short getDefaultReplication(Path path) {
+ return underlyingFs.getDefaultReplication(path);
+ }
+
+ @Override
+ public boolean mkdirs(Path folderPath) throws IOException {
+ if (!underlyingFs.exists(folderPath)) {
+ return underlyingFs.mkdirs(folderPath);
+ } else if (!underlyingFs.getFileStatus(folderPath).isDir()) {
+ throw new IOException("The specified folder path exists and is not a folder.");
+ }
+ return false;
+ }
+
+ @Override
+ public FSDataOutputStream create(Path f, FsPermission permission, EnumSet<CreateFlag> flags, int bufferSize,
+ short replication, long blockSize, Progressable progress, ChecksumOpt checksumOpt) throws IOException {
+ return underlyingFs.create(f, permission, flags, bufferSize, replication, blockSize, progress, checksumOpt);
+ }
+
+ @Override
+ @Deprecated
+ public FSDataOutputStream createNonRecursive(Path f, boolean overwrite, int bufferSize, short replication,
+ long blockSize, Progressable progress) throws IOException {
+ return underlyingFs.createNonRecursive(f, overwrite, bufferSize, replication, blockSize, progress);
+ }
+
+ @Override
+ @Deprecated
+ public FSDataOutputStream createNonRecursive(Path f, FsPermission permission, boolean overwrite, int bufferSize,
+ short replication, long blockSize, Progressable progress) throws IOException {
+ return underlyingFs.createNonRecursive(f, permission, overwrite, bufferSize, replication, blockSize, progress);
+ }
+
+ @Override
+ @Deprecated
+ public FSDataOutputStream createNonRecursive(Path f, FsPermission permission, EnumSet<CreateFlag> flags, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException {
+ return underlyingFs.createNonRecursive(f, permission, flags, bufferSize, replication, blockSize, progress);
+ }
+
+ @Override
+ public boolean createNewFile(Path f) throws IOException {
+ return underlyingFs.createNewFile(f);
+ }
+
+ @Override
+ public FSDataOutputStream append(Path f) throws IOException {
+ return underlyingFs.append(f);
+ }
+
+ @Override
+ public FSDataOutputStream append(Path f, int bufferSize) throws IOException {
+ return underlyingFs.append(f, bufferSize);
+ }
+
+ @Override
+ public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short
+ replication, long blockSize, Progressable progress) throws IOException {
+ return underlyingFs.create(f, permission, overwrite, bufferSize, replication, blockSize, progress);
+ }
+
+ @Override
+ public FSDataOutputStream create(Path f, FsPermission permission, EnumSet<CreateFlag> flags, int bufferSize,
+ short replication, long blockSize, Progressable progress) throws IOException {
+ return underlyingFs.create(f, permission, flags, bufferSize, replication, blockSize, progress);
+ }
+
+ @Override
+ public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException {
+ return underlyingFs.listStatus(f);
+ }
+
+ @Override
+ public RemoteIterator<Path> listCorruptFileBlocks(Path path) throws IOException {
+ return underlyingFs.listCorruptFileBlocks(path);
+ }
+
+ @Override
+ public FileStatus[] listStatus(Path f, PathFilter filter) throws FileNotFoundException, IOException {
+ return underlyingFs.listStatus(f, filter);
+ }
+
+ @Override
+ public FileStatus[] listStatus(Path[] files) throws FileNotFoundException, IOException {
+ return underlyingFs.listStatus(files);
+ }
+
+ @Override
+ public FileStatus[] listStatus(Path[] files, PathFilter filter) throws FileNotFoundException, IOException {
+ return underlyingFs.listStatus(files, filter);
+ }
+
+ @Override
+ public FileStatus[] globStatus(Path pathPattern) throws IOException {
+ return underlyingFs.globStatus(pathPattern);
+ }
+
+ @Override
+ public FileStatus[] globStatus(Path pathPattern, PathFilter filter) throws IOException {
+ return underlyingFs.globStatus(pathPattern, filter);
+ }
+
+ @Override
+ public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f) throws FileNotFoundException, IOException {
+ return underlyingFs.listLocatedStatus(f);
+ }
+
+ @Override
+ public RemoteIterator<LocatedFileStatus> listFiles(Path f, boolean recursive) throws FileNotFoundException, IOException {
+ return underlyingFs.listFiles(f, recursive);
+ }
+
+ @Override
+ public Path getHomeDirectory() {
+ return underlyingFs.getHomeDirectory();
+ }
+
+ @Override
+ public void setWorkingDirectory(Path new_dir) {
+ underlyingFs.setWorkingDirectory(new_dir);
+ }
+
+ @Override
+ public boolean rename(Path src, Path dst) throws IOException {
+ return underlyingFs.rename(src, dst);
+ }
+
+ @Override
+ @Deprecated
+ public boolean delete(Path f) throws IOException {
+ return underlyingFs.delete(f);
+ }
+
+ @Override
+ public boolean delete(Path f, boolean recursive) throws IOException {
+ return underlyingFs.delete(f, recursive);
+ }
+
+ @Override
+ public boolean deleteOnExit(Path f) throws IOException {
+ return underlyingFs.deleteOnExit(f);
+ }
+
+ @Override
+ public boolean cancelDeleteOnExit(Path f) {
+ return underlyingFs.cancelDeleteOnExit(f);
+ }
+
+ @Override
+ public boolean exists(Path f) throws IOException {
+ return underlyingFs.exists(f);
+ }
+
+ @Override
+ public boolean isDirectory(Path f) throws IOException {
+ return underlyingFs.isDirectory(f);
+ }
+
+ @Override
+ public boolean isFile(Path f) throws IOException {
+ return underlyingFs.isFile(f);
+ }
+
+ @Override
+ @Deprecated
+ public long getLength(Path f) throws IOException {
+ return underlyingFs.getLength(f);
+ }
+
+ @Override
+ public ContentSummary getContentSummary(Path f) throws IOException {
+ return underlyingFs.getContentSummary(f);
+ }
+
+ @Override
+ public URI getUri() {
+ return underlyingFs.getUri();
+ }
+
+ @Override
+ @LimitedPrivate({"HDFS", "MapReduce"})
+ public String getCanonicalServiceName() {
+ return underlyingFs.getCanonicalServiceName();
+ }
+
+ @Override
+ @Deprecated
+ public String getName() {
+ return underlyingFs.getName();
+ }
+
+ @Override
+ public Path makeQualified(Path path) {
+ return underlyingFs.makeQualified(path);
+ }
+
+ @Override
+ @Private
+ public Token<?> getDelegationToken(String renewer) throws IOException {
+ return underlyingFs.getDelegationToken(renewer);
+ }
+
+ @Override
+ @LimitedPrivate({"HDFS", "MapReduce"})
+ public Token<?>[] addDelegationTokens(String renewer, Credentials credentials) throws IOException {
+ return underlyingFs.addDelegationTokens(renewer, credentials);
+ }
+
+ @Override
+ @LimitedPrivate({"HDFS"})
+ @VisibleForTesting
+ public FileSystem[] getChildFileSystems() {
+ return underlyingFs.getChildFileSystems();
+ }
+
+ @Override
+ public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException {
+ return underlyingFs.getFileBlockLocations(file, start, len);
+ }
+
+ @Override
+ public BlockLocation[] getFileBlockLocations(Path p, long start, long len) throws IOException {
+ return underlyingFs.getFileBlockLocations(p, start, len);
+ }
+
+ @Override
+ @Deprecated
+ public FsServerDefaults getServerDefaults() throws IOException {
+ return underlyingFs.getServerDefaults();
+ }
+
+ @Override
+ public FsServerDefaults getServerDefaults(Path p) throws IOException {
+ return underlyingFs.getServerDefaults(p);
+ }
+
+ @Override
+ public Path resolvePath(Path p) throws IOException {
+ return underlyingFs.resolvePath(p);
+ }
+
+ public List<FileStatus> list(boolean recursive, Path... paths) throws IOException {
+ if (recursive) {
+ List<FileStatus> statuses = Lists.newArrayList();
+ for (Path p : paths) {
+ addRecursiveStatus(underlyingFs.getFileStatus(p), statuses);
+ }
+ return statuses;
+
+ } else {
+ return Lists.newArrayList(underlyingFs.listStatus(paths));
+ }
+ }
+
+
+ private void addRecursiveStatus(FileStatus parent, List<FileStatus> listToFill) throws IOException {
+ if (parent.isDir()) {
+ Path pattern = new Path(parent.getPath(), "*");
+ FileStatus[] sub = underlyingFs.globStatus(pattern, new DrillPathFilter());
+ for(FileStatus s : sub){
+ if (s.isDir()) {
+ addRecursiveStatus(s, listToFill);
+ } else {
+ listToFill.add(s);
+ }
+ }
+ } else {
+ listToFill.add(parent);
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/c54bd6ac/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
index cf8937f..b9ae303 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
@@ -23,12 +23,10 @@ import java.util.List;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
/**
@@ -136,7 +134,7 @@ public class FileSelection {
return new FileSelection(Collections.singletonList(status), p.toUri().getPath());
} else {
Path p = new Path(parent,removeLeadingSlash(path));
- FileStatus[] status = fs.getUnderlying().globStatus(p);
+ FileStatus[] status = fs.globStatus(p);
if (status == null || status.length == 0) {
return null;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/c54bd6ac/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
index db6c0c7..7a1f61e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
@@ -36,8 +36,6 @@ import org.apache.drill.exec.store.AbstractStoragePlugin;
import org.apache.drill.exec.store.ClassPathFileSystem;
import org.apache.drill.exec.store.LocalSyncableFileSystem;
import org.apache.drill.exec.store.StoragePluginOptimizerRule;
-import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
-import org.apache.drill.exec.store.dfs.shim.FileSystemCreator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -71,7 +69,7 @@ public class FileSystemPlugin extends AbstractStoragePlugin{
fsConf.set(FileSystem.FS_DEFAULT_NAME_KEY, config.connection);
fsConf.set("fs.classpath.impl", ClassPathFileSystem.class.getName());
fsConf.set("fs.drill-local.impl", LocalSyncableFileSystem.class.getName());
- fs = FileSystemCreator.getFileSystem(context.getConfig(), fsConf);
+ fs = new DrillFileSystem(fsConf);
formatPluginsByName = FormatCreator.getFormatPlugins(context, fs, config);
List<FormatMatcher> matchers = Lists.newArrayList();
formatPluginsByConfig = Maps.newHashMap();
http://git-wip-us.apache.org/repos/asf/drill/blob/c54bd6ac/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatCreator.java
index e5c0487..c164ed5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatCreator.java
@@ -29,7 +29,6 @@ import org.apache.drill.common.util.ConstructorChecker;
import org.apache.drill.common.util.PathScanner;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
import com.google.common.collect.Maps;
http://git-wip-us.apache.org/repos/asf/drill/blob/c54bd6ac/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
index 27f83f0..58d5b42 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
@@ -29,7 +29,6 @@ import org.apache.drill.exec.physical.base.AbstractWriter;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.StoragePluginOptimizerRule;
-import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
/**
* Similar to a storage engine but built specifically to work within a FileSystem context.
http://git-wip-us.apache.org/repos/asf/drill/blob/c54bd6ac/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
index 7b9d52c..7c8d9b3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
@@ -18,6 +18,7 @@
package org.apache.drill.exec.store.dfs;
import java.io.IOException;
+import java.io.OutputStream;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -28,7 +29,6 @@ import com.google.common.collect.ImmutableList;
import net.hydromatic.optiq.Table;
import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.dotdrill.DotDrillFile;
@@ -43,8 +43,6 @@ import org.apache.drill.exec.planner.logical.FileSystemCreateTableEntry;
import org.apache.drill.exec.planner.sql.ExpandingConcurrentMap;
import org.apache.drill.exec.rpc.user.UserSession;
import org.apache.drill.exec.store.AbstractSchema;
-import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
-import org.apache.drill.exec.store.dfs.shim.DrillOutputStream;
import org.apache.drill.exec.store.sys.PStore;
import org.apache.drill.exec.store.sys.PStoreConfig;
import org.apache.drill.exec.store.sys.PStoreProvider;
@@ -172,9 +170,9 @@ public class WorkspaceSchemaFactory implements ExpandingConcurrentMap.MapValueFa
public boolean createView(View view) throws Exception {
Path viewPath = getViewPath(view.getName());
- boolean replaced = fs.getUnderlying().exists(viewPath);
- try (DrillOutputStream stream = fs.create(viewPath)) {
- mapper.writeValue(stream.getOuputStream(), view);
+ boolean replaced = fs.exists(viewPath);
+ try (OutputStream stream = fs.create(viewPath)) {
+ mapper.writeValue(stream, view);
}
if (knownViews != null) {
knownViews.put(view.getName(), viewPath.toString());
@@ -184,11 +182,11 @@ public class WorkspaceSchemaFactory implements ExpandingConcurrentMap.MapValueFa
public boolean viewExists(String viewName) throws Exception {
Path viewPath = getViewPath(viewName);
- return fs.getUnderlying().exists(viewPath);
+ return fs.exists(viewPath);
}
public void dropView(String viewName) throws IOException {
- fs.getUnderlying().delete(getViewPath(viewName), false);
+ fs.delete(getViewPath(viewName), false);
if (knownViews != null) {
knownViews.delete(viewName);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/c54bd6ac/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
index 9cc1808..431b362 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
@@ -30,6 +30,7 @@ import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
import org.apache.drill.exec.physical.base.AbstractWriter;
import org.apache.drill.exec.physical.base.PhysicalOperator;
@@ -45,7 +46,7 @@ import org.apache.drill.exec.store.dfs.BasicFormatMatcher;
import org.apache.drill.exec.store.dfs.FileSelection;
import org.apache.drill.exec.store.dfs.FormatMatcher;
import org.apache.drill.exec.store.dfs.FormatPlugin;
-import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
@@ -79,7 +80,7 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
this.storageConfig = storageConfig;
this.formatConfig = formatConfig;
this.name = name == null ? defaultName : name;
- this.codecFactory = new CompressionCodecFactory(new Configuration(fs.getUnderlying().getConf()));
+ this.codecFactory = new CompressionCodecFactory(new Configuration(fs.getConf()));
}
@Override
@@ -113,7 +114,8 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
return compressible;
}
- public abstract RecordReader getRecordReader(FragmentContext context, FileWork fileWork, List<SchemaPath> columns) throws ExecutionSetupException;
+ public abstract RecordReader getRecordReader(FragmentContext context, DrillFileSystem dfs, FileWork fileWork,
+ List<SchemaPath> columns) throws ExecutionSetupException;
RecordBatch getReaderBatch(FragmentContext context, EasySubScan scan) throws ExecutionSetupException {
String partitionDesignator = context.getConfig().getString(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL);
@@ -147,8 +149,11 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
}
int numParts = 0;
+ OperatorContext oContext = new OperatorContext(scan, context,
+ false /* ScanBatch is not subject to fragment memory limit */);
+ DrillFileSystem dfs = new DrillFileSystem(fs, oContext.getStats());
for(FileWork work : scan.getWorkUnits()){
- readers.add(getRecordReader(context, work, scan.getColumns()));
+ readers.add(getRecordReader(context, dfs, work, scan.getColumns()));
if (scan.getSelectionRoot() != null) {
String[] r = scan.getSelectionRoot().split("/");
String[] p = work.getPath().split("/");
@@ -170,7 +175,7 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
}
}
- return new ScanBatch(scan, context, readers.iterator(), partitionColumns, selectedPartitionColumns);
+ return new ScanBatch(scan, context, oContext, readers.iterator(), partitionColumns, selectedPartitionColumns);
}
public abstract RecordWriter getRecordWriter(FragmentContext context, EasyWriter writer) throws IOException;
http://git-wip-us.apache.org/repos/asf/drill/blob/c54bd6ac/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
index b505535..9902443 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
@@ -110,7 +110,7 @@ public class EasyGroupScan extends AbstractFileGroupScan{
private void initFromSelection(FileSelection selection, EasyFormatPlugin<?> formatPlugin) throws IOException {
this.selection = selection;
- BlockMapBuilder b = new BlockMapBuilder(formatPlugin.getFileSystem().getUnderlying(), formatPlugin.getContext().getBits());
+ BlockMapBuilder b = new BlockMapBuilder(formatPlugin.getFileSystem(), formatPlugin.getContext().getBits());
this.chunks = b.generateFileWork(selection.getFileStatusList(formatPlugin.getFileSystem()), formatPlugin.isBlockSplittable());
this.maxWidth = chunks.size();
this.endpointAffinities = AffinityCreator.getAffinityMap(chunks);
http://git-wip-us.apache.org/repos/asf/drill/blob/c54bd6ac/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/DrillFileSystem.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/DrillFileSystem.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/DrillFileSystem.java
deleted file mode 100644
index d3f9134..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/DrillFileSystem.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.dfs.shim;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-/**
- * Wraps the underlying filesystem to provide advanced file system features. Delegates to underlying file system if
- * those features are exposed.
- */
-public abstract class DrillFileSystem implements AutoCloseable{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillFileSystem.class);
-
- public abstract FileSystem getUnderlying();
-
- public abstract BlockLocation[] getBlockLocations(FileStatus status, long start, long length) throws IOException;
- public abstract List<FileStatus> list(boolean recursive, Path... paths) throws IOException;
- public abstract FileStatus getFileStatus(Path p) throws IOException;
- public abstract DrillOutputStream create(Path p) throws IOException;
- public abstract DrillInputStream open(Path p) throws IOException;
- public abstract boolean mkdirs(Path f) throws IOException;
-
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/c54bd6ac/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/DrillInputStream.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/DrillInputStream.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/DrillInputStream.java
deleted file mode 100644
index 8c3b5ae..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/DrillInputStream.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.dfs.shim;
-
-import org.apache.hadoop.fs.FSDataInputStream;
-
-public abstract class DrillInputStream implements AutoCloseable{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillInputStream.class);
-
-// public abstract AccountingByteBuf readNow(long start, long length) throws IOException;
-// public abstract void readNow(AccountingByteBuf b, long start, long length) throws IOException;
-// public abstract AccountingByteBuf readNow() throws IOException;
-
- public abstract FSDataInputStream getInputStream();
-// public abstract CheckedFuture<Long, IOException> readFuture(AccountingByteBuf b, long start, long length) throws IOException;
-
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/c54bd6ac/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/DrillOutputStream.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/DrillOutputStream.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/DrillOutputStream.java
deleted file mode 100644
index 8e56232..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/DrillOutputStream.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.dfs.shim;
-
-import java.io.OutputStream;
-
-
-public abstract class DrillOutputStream implements AutoCloseable{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillOutputStream.class);
-
- public abstract OutputStream getOuputStream();
-// public abstract CheckedFuture<Long, IOException> writeFuture(AccountingByteBuf b);
-
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/c54bd6ac/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/FileSystemCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/FileSystemCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/FileSystemCreator.java
deleted file mode 100644
index a5ad257..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/FileSystemCreator.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.dfs.shim;
-
-import java.io.IOException;
-
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.exec.store.dfs.shim.fallback.FallbackFileSystem;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-
-public class FileSystemCreator {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FileSystemCreator.class);
-
- public static DrillFileSystem getFileSystem(DrillConfig config, Configuration fsConf) throws IOException{
- FileSystem fs = FileSystem.get(fsConf);
- return new FallbackFileSystem(config, fs);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/c54bd6ac/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/fallback/FallbackFileSystem.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/fallback/FallbackFileSystem.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/fallback/FallbackFileSystem.java
deleted file mode 100644
index 959529a..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/fallback/FallbackFileSystem.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.dfs.shim.fallback;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.exec.store.dfs.DrillPathFilter;
-import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
-import org.apache.drill.exec.store.dfs.shim.DrillInputStream;
-import org.apache.drill.exec.store.dfs.shim.DrillOutputStream;
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import com.google.common.collect.Lists;
-
-public class FallbackFileSystem extends DrillFileSystem {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FallbackFileSystem.class);
-
- final FileSystem fs;
-
- public FallbackFileSystem(DrillConfig config, FileSystem fs) {
- this.fs = fs;
- }
-
- @Override
- public FileSystem getUnderlying() {
- return fs;
- }
-
- @Override
- public List<FileStatus> list(boolean recursive, Path... paths) throws IOException {
- if (recursive) {
- List<FileStatus> statuses = Lists.newArrayList();
- for (Path p : paths) {
- addRecursiveStatus(fs.getFileStatus(p), statuses);
- }
- return statuses;
-
- } else {
- return Lists.newArrayList(fs.listStatus(paths));
- }
- }
-
-
- private void addRecursiveStatus(FileStatus parent, List<FileStatus> listToFill) throws IOException {
- if (parent.isDir()) {
- Path pattern = new Path(parent.getPath(), "*");
- FileStatus[] sub = fs.globStatus(pattern, new DrillPathFilter());
- for(FileStatus s : sub){
- if (s.isDir()) {
- addRecursiveStatus(s, listToFill);
- } else {
- listToFill.add(s);
- }
- }
- } else {
- listToFill.add(parent);
- }
- }
-
- @Override
- public FileStatus getFileStatus(Path p) throws IOException {
- return fs.getFileStatus(p);
- }
-
- @Override
- public DrillOutputStream create(Path p) throws IOException {
- return new Out(fs.create(p));
- }
-
- @Override
- public DrillInputStream open(Path p) throws IOException {
- return new In(fs.open(p));
- }
-
- @Override
- public void close() throws Exception {
- fs.close();
- }
-
- @Override
- public BlockLocation[] getBlockLocations(FileStatus status, long start, long len) throws IOException {
- return fs.getFileBlockLocations(status, start, len);
- }
-
- private class Out extends DrillOutputStream {
-
- private final FSDataOutputStream out;
-
- public Out(FSDataOutputStream out) {
- super();
- this.out = out;
- }
-
- @Override
- public void close() throws Exception {
- out.close();
- }
-
- @Override
- public FSDataOutputStream getOuputStream() {
- return out;
- }
-
- }
-
- private class In extends DrillInputStream {
-
- private final FSDataInputStream in;
-
- public In(FSDataInputStream in) {
- super();
- this.in = in;
- }
-
- @Override
- public FSDataInputStream getInputStream() {
- return in;
- }
-
- @Override
- public void close() throws Exception {
- in.close();
- }
-
- }
-
- @Override
- public boolean mkdirs(Path folderPath) throws IOException {
- if (!fs.exists(folderPath)) {
- return fs.mkdirs(folderPath);
- } else if (!fs.getFileStatus(folderPath).isDirectory()) {
- throw new IOException("The specified folder path exists and is not a folder.");
- }
- return false;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/c54bd6ac/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
index d41243d..6cf1ce5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
@@ -35,7 +35,7 @@ import org.apache.drill.exec.store.dfs.FileSystemConfig;
import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
import org.apache.drill.exec.store.dfs.easy.EasyWriter;
import org.apache.drill.exec.store.dfs.easy.FileWork;
-import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.easy.json.JSONFormatPlugin.JSONFormatConfig;
import org.apache.hadoop.fs.FileSystem;
@@ -56,9 +56,9 @@ public class JSONFormatPlugin extends EasyFormatPlugin<JSONFormatConfig> {
}
@Override
- public RecordReader getRecordReader(FragmentContext context, FileWork fileWork,
+ public RecordReader getRecordReader(FragmentContext context, DrillFileSystem dfs, FileWork fileWork,
List<SchemaPath> columns) throws ExecutionSetupException {
- return new JSONRecordReader(context, fileWork.getPath(), this.getFileSystem().getUnderlying(), columns);
+ return new JSONRecordReader(context, fileWork.getPath(), dfs, columns);
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/c54bd6ac/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
index 0070d18..557c0f0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
@@ -32,6 +32,7 @@ import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.vector.BaseValueVector;
import org.apache.drill.exec.vector.complex.fn.JsonReader;
import org.apache.drill.exec.vector.complex.fn.JsonReader.ReadState;
@@ -54,8 +55,8 @@ public class JSONRecordReader extends AbstractRecordReader {
private OutputMutator mutator;
private VectorContainerWriter writer;
private Path hadoopPath;
- private FileSystem fileSystem;
private InputStream stream;
+ private DrillFileSystem fileSystem;
private JsonReader jsonReader;
private int recordCount;
private FragmentContext fragmentContext;
@@ -63,7 +64,7 @@ public class JSONRecordReader extends AbstractRecordReader {
private List<SchemaPath> columns;
private boolean enableAllTextMode;
- public JSONRecordReader(FragmentContext fragmentContext, String inputPath, FileSystem fileSystem,
+ public JSONRecordReader(FragmentContext fragmentContext, String inputPath, DrillFileSystem fileSystem,
List<SchemaPath> columns) throws OutOfMemoryException {
this.hadoopPath = new Path(inputPath);
this.fileSystem = fileSystem;
http://git-wip-us.apache.org/repos/asf/drill/blob/c54bd6ac/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
index b64a032..bf46395 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
@@ -39,7 +39,7 @@ import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
import org.apache.drill.exec.store.dfs.easy.EasyGroupScan;
import org.apache.drill.exec.store.dfs.easy.EasyWriter;
import org.apache.drill.exec.store.dfs.easy.FileWork;
-import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.text.DrillTextRecordReader;
import org.apache.drill.exec.store.text.DrillTextRecordWriter;
import org.apache.hadoop.fs.FileSystem;
@@ -62,9 +62,9 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
@Override
- public RecordReader getRecordReader(FragmentContext context, FileWork fileWork,
+ public RecordReader getRecordReader(FragmentContext context, DrillFileSystem dfs, FileWork fileWork,
List<SchemaPath> columns) throws ExecutionSetupException {
- Path path = getFileSystem().getUnderlying().makeQualified(new Path(fileWork.getPath()));
+ Path path = dfs.makeQualified(new Path(fileWork.getPath()));
FileSplit split = new FileSplit(path, fileWork.getStart(), fileWork.getLength(), new String[]{""});
Preconditions.checkArgument(((TextFormatConfig)formatConfig).getDelimiter().length() == 1, "Only single character delimiter supported");
return new DrillTextRecordReader(split, context, ((TextFormatConfig) formatConfig).getDelimiter().charAt(0), columns);
http://git-wip-us.apache.org/repos/asf/drill/blob/c54bd6ac/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
index 109033a..ab864db 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
@@ -45,7 +45,7 @@ import org.apache.drill.exec.store.dfs.FormatMatcher;
import org.apache.drill.exec.store.dfs.FormatPlugin;
import org.apache.drill.exec.store.dfs.FormatSelection;
import org.apache.drill.exec.store.dfs.MagicString;
-import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.mock.MockStorageEngine;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -79,7 +79,7 @@ public class ParquetFormatPlugin implements FormatPlugin{
public ParquetFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig storageConfig, ParquetFormatConfig formatConfig){
this.context = context;
- this.codecFactoryExposer = new CodecFactoryExposer(fs.getUnderlying().getConf());
+ this.codecFactoryExposer = new CodecFactoryExposer(fs.getConf());
this.config = formatConfig;
this.formatMatcher = new ParquetFormatMatcher(this, fs);
this.storageConfig = storageConfig;
@@ -88,7 +88,7 @@ public class ParquetFormatPlugin implements FormatPlugin{
}
Configuration getHadoopConfig() {
- return fs.getUnderlying().getConf();
+ return fs.getConf();
}
public DrillFileSystem getFileSystem() {
@@ -219,13 +219,13 @@ public class ParquetFormatPlugin implements FormatPlugin{
boolean isDirReadable(FileStatus dir) {
Path p = new Path(dir.getPath(), ParquetFileWriter.PARQUET_METADATA_FILE);
try {
- if (fs.getUnderlying().exists(p)) {
+ if (fs.exists(p)) {
return true;
} else {
PathFilter filter = new DrillPathFilter();
- FileStatus[] files = fs.getUnderlying().listStatus(dir.getPath(), filter);
+ FileStatus[] files = fs.listStatus(dir.getPath(), filter);
if (files.length == 0) {
return false;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/c54bd6ac/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index 8ddf5fd..28e9f4e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -135,7 +135,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
Preconditions.checkNotNull(formatConfig);
this.formatPlugin = (ParquetFormatPlugin) engineRegistry.getFormatPlugin(storageConfig, formatConfig);
Preconditions.checkNotNull(formatPlugin);
- this.fs = formatPlugin.getFileSystem().getUnderlying();
+ this.fs = formatPlugin.getFileSystem();
this.formatConfig = formatPlugin.getConfig();
this.entries = entries;
this.selectionRoot = selectionRoot;
@@ -155,7 +155,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
this.formatPlugin = formatPlugin;
this.columns = columns;
this.formatConfig = formatPlugin.getConfig();
- this.fs = formatPlugin.getFileSystem().getUnderlying();
+ this.fs = formatPlugin.getFileSystem();
this.entries = Lists.newArrayList();
for (FileStatus file : files) {
http://git-wip-us.apache.org/repos/asf/drill/blob/c54bd6ac/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index dc1d892..ad1bf32 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -29,12 +29,14 @@ import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.physical.impl.BatchCreator;
import org.apache.drill.exec.physical.impl.ScanBatch;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.store.AbstractRecordReader;
import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
import org.apache.drill.exec.store.parquet2.DrillParquetReader;
import org.apache.hadoop.conf.Configuration;
@@ -64,6 +66,8 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
String partitionDesignator = context.getConfig().getString(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL);
List<SchemaPath> columns = rowGroupScan.getColumns();
List<RecordReader> readers = Lists.newArrayList();
+ OperatorContext oContext = new OperatorContext(rowGroupScan, context,
+ false /* ScanBatch is not subject to fragment memory limit */);
List<String[]> partitionColumns = Lists.newArrayList();
List<Integer> selectedPartitionColumns = Lists.newArrayList();
@@ -90,7 +94,7 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
rowGroupScan.setOperatorId(id);
}
- FileSystem fs = rowGroupScan.getStorageEngine().getFileSystem().getUnderlying();
+ DrillFileSystem fs = new DrillFileSystem(rowGroupScan.getStorageEngine().getFileSystem(), oContext.getStats());
Configuration conf = fs.getConf();
conf.setBoolean(ENABLE_BYTES_READ_COUNTER, false);
conf.setBoolean(ENABLE_BYTES_TOTAL_COUNTER, false);
@@ -123,7 +127,7 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
);
} else {
ParquetMetadata footer = footers.get(e.getPath());
- readers.add(new DrillParquetReader(context, footer, e, newColumns, conf));
+ readers.add(new DrillParquetReader(context, footer, e, newColumns, fs));
}
if (rowGroupScan.getSelectionRoot() != null) {
String[] r = rowGroupScan.getSelectionRoot().split("/");
@@ -149,7 +153,8 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
}
}
- ScanBatch s = new ScanBatch(rowGroupScan, context, readers.iterator(), partitionColumns, selectedPartitionColumns);
+ ScanBatch s =
+ new ScanBatch(rowGroupScan, context, oContext, readers.iterator(), partitionColumns, selectedPartitionColumns);
for(RecordReader r : readers){
r.setOperatorContext(s.getOperatorContext());
http://git-wip-us.apache.org/repos/asf/drill/blob/c54bd6ac/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
index 25f383f..8778ef8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
@@ -40,6 +40,7 @@ import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.MaterializedField.Key;
import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.parquet.RowGroupReadEntry;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.BaseValueVector;
@@ -82,7 +83,7 @@ public class DrillParquetReader extends AbstractRecordReader {
private ParquetMetadata footer;
private MessageType schema;
- private Configuration conf;
+ private DrillFileSystem fileSystem;
private RowGroupReadEntry entry;
private VectorContainerWriter writer;
private ColumnChunkIncReadStore pageReadStore;
@@ -111,9 +112,10 @@ public class DrillParquetReader extends AbstractRecordReader {
boolean noColumnsFound = false; // true if none of the columns in the projection list is found in the schema
- public DrillParquetReader(FragmentContext fragmentContext, ParquetMetadata footer, RowGroupReadEntry entry, List<SchemaPath> columns, Configuration conf) {
+ public DrillParquetReader(FragmentContext fragmentContext, ParquetMetadata footer, RowGroupReadEntry entry,
+ List<SchemaPath> columns, DrillFileSystem fileSystem) {
this.footer = footer;
- this.conf = conf;
+ this.fileSystem = fileSystem;
this.entry = entry;
setColumns(columns);
this.fragmentContext = fragmentContext;
@@ -243,8 +245,7 @@ public class DrillParquetReader extends AbstractRecordReader {
paths.put(md.getPath(), md);
}
- CodecFactoryExposer codecFactoryExposer = new CodecFactoryExposer(conf);
- FileSystem fs = FileSystem.get(conf);
+ CodecFactoryExposer codecFactoryExposer = new CodecFactoryExposer(fileSystem.getConf());
Path filePath = new Path(entry.getPath());
BlockMetaData blockMetaData = footer.getBlocks().get(entry.getRowGroupIndex());
@@ -252,7 +253,7 @@ public class DrillParquetReader extends AbstractRecordReader {
recordCount = (int) blockMetaData.getRowCount();
pageReadStore = new ColumnChunkIncReadStore(recordCount,
- codecFactoryExposer.getCodecFactory(), operatorContext.getAllocator(), fs, filePath);
+ codecFactoryExposer.getCodecFactory(), operatorContext.getAllocator(), fileSystem, filePath);
for (String[] path : schema.getPaths()) {
Type type = schema.getType(path);
http://git-wip-us.apache.org/repos/asf/drill/blob/c54bd6ac/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/FilePStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/FilePStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/FilePStore.java
index 40f25e7..baa998d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/FilePStore.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/FilePStore.java
@@ -23,7 +23,6 @@ import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.net.URI;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -31,8 +30,7 @@ import java.util.Map.Entry;
import org.apache.commons.io.IOUtils;
import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
-import org.apache.drill.exec.store.dfs.shim.FileSystemCreator;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.sys.PStore;
import org.apache.drill.exec.store.sys.PStoreConfig;
import org.apache.hadoop.conf.Configuration;
@@ -65,7 +63,7 @@ public class FilePStore<V> implements PStore<V> {
}
private void mk(Path path) throws IOException{
- fs.getUnderlying().mkdirs(path);
+ fs.mkdirs(path);
}
public static Path getLogDir(){
@@ -84,8 +82,8 @@ public class FilePStore<V> implements PStore<V> {
}
- DrillFileSystem fs = FileSystemCreator.getFileSystem(config, fsConf);
- fs.getUnderlying().mkdirs(blobRoot);
+ DrillFileSystem fs = new DrillFileSystem(fsConf);
+ fs.mkdirs(blobRoot);
return fs;
}
@@ -128,14 +126,14 @@ public class FilePStore<V> implements PStore<V> {
public V get(String key) {
try{
Path path = p(key);
- if(!fs.getUnderlying().exists(path)){
+ if(!fs.exists(path)){
return null;
}
}catch(IOException e){
throw new RuntimeException(e);
}
- try (InputStream is = fs.open(p(key)).getInputStream()) {
+ try (InputStream is = fs.open(p(key))) {
return config.getSerializer().deserialize(IOUtils.toByteArray(is));
} catch (IOException e) {
throw new RuntimeException(e);
@@ -143,7 +141,7 @@ public class FilePStore<V> implements PStore<V> {
}
public void put(String key, V value) {
- try (OutputStream os = fs.create(p(key)).getOuputStream()) {
+ try (OutputStream os = fs.create(p(key))) {
IOUtils.write(config.getSerializer().serialize(value), os);
} catch (IOException e) {
throw new RuntimeException(e);
@@ -154,7 +152,7 @@ public class FilePStore<V> implements PStore<V> {
public boolean putIfAbsent(String key, V value) {
try {
Path p = p(key);
- if (fs.getUnderlying().exists(p)) {
+ if (fs.exists(p)) {
return false;
} else {
put(key, value);
@@ -167,7 +165,7 @@ public class FilePStore<V> implements PStore<V> {
public void delete(String key) {
try {
- fs.getUnderlying().delete(p(key), false);
+ fs.delete(p(key), false);
} catch (IOException e) {
throw new RuntimeException(e);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/c54bd6ac/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStoreProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStoreProvider.java
index ac53a61..3131290 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStoreProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStoreProvider.java
@@ -23,8 +23,7 @@ import java.util.concurrent.ConcurrentMap;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
-import org.apache.drill.exec.store.sys.EStore;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.sys.PStore;
import org.apache.drill.exec.store.sys.PStoreConfig;
import org.apache.drill.exec.store.sys.PStoreProvider;