You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ad...@apache.org on 2016/09/13 01:32:26 UTC
[39/50] [abbrv] drill git commit: Refactoring code for better
organization.
Refactoring code for better organization.
+ Adding skeleton streams plugin.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/c74d75ce
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/c74d75ce
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/c74d75ce
Branch: refs/heads/master
Commit: c74d75ce4aebc2fe188793d77dc5bb22542c9c4a
Parents: 58e95ca
Author: Aditya <ad...@mapr.com>
Authored: Fri Mar 4 15:17:36 2016 -0800
Committer: Aditya Kishore <ad...@apache.org>
Committed: Fri Sep 9 10:08:38 2016 -0700
----------------------------------------------------------------------
.../exec/store/mapr/TableFormatMatcher.java | 76 +++
.../exec/store/mapr/TableFormatPlugin.java | 138 +++++
.../store/mapr/TableFormatPluginConfig.java | 38 ++
.../exec/store/mapr/db/MapRDBFormatMatcher.java | 42 ++
.../exec/store/mapr/db/MapRDBFormatPlugin.java | 82 +++
.../store/mapr/db/MapRDBFormatPluginConfig.java | 68 +++
.../exec/store/mapr/db/MapRDBGroupScan.java | 283 +++++++++
.../store/mapr/db/MapRDBPushFilterIntoScan.java | 205 +++++++
.../store/mapr/db/MapRDBScanBatchCreator.java | 65 +++
.../drill/exec/store/mapr/db/MapRDBSubScan.java | 125 ++++
.../exec/store/mapr/db/MapRDBSubScanSpec.java | 114 ++++
.../exec/store/mapr/db/MapRDBTableStats.java | 46 ++
.../exec/store/mapr/db/TabletFragmentInfo.java | 108 ++++
.../mapr/db/binary/BinaryTableGroupScan.java | 214 +++++++
.../db/binary/CompareFunctionsProcessor.java | 547 ++++++++++++++++++
.../mapr/db/binary/MapRDBFilterBuilder.java | 355 ++++++++++++
.../mapr/db/json/CompareFunctionsProcessor.java | 222 ++++++++
.../mapr/db/json/JsonConditionBuilder.java | 240 ++++++++
.../exec/store/mapr/db/json/JsonScanSpec.java | 109 ++++
.../store/mapr/db/json/JsonSubScanSpec.java | 112 ++++
.../store/mapr/db/json/JsonTableGroupScan.java | 183 ++++++
.../mapr/db/json/MaprDBJsonRecordReader.java | 569 +++++++++++++++++++
.../exec/store/mapr/db/util/CommonFns.java | 26 +
.../mapr/streams/StreamsFormatMatcher.java | 42 ++
.../store/mapr/streams/StreamsFormatPlugin.java | 79 +++
.../mapr/streams/StreamsFormatPluginConfig.java | 39 ++
.../exec/store/maprdb/MapRDBFormatMatcher.java | 68 ---
.../exec/store/maprdb/MapRDBFormatPlugin.java | 173 ------
.../store/maprdb/MapRDBFormatPluginConfig.java | 82 ---
.../exec/store/maprdb/MapRDBGroupScan.java | 283 ---------
.../store/maprdb/MapRDBPushFilterIntoScan.java | 205 -------
.../store/maprdb/MapRDBScanBatchCreator.java | 65 ---
.../drill/exec/store/maprdb/MapRDBSubScan.java | 124 ----
.../exec/store/maprdb/MapRDBSubScanSpec.java | 114 ----
.../exec/store/maprdb/MapRDBTableStats.java | 46 --
.../exec/store/maprdb/TabletFragmentInfo.java | 108 ----
.../maprdb/binary/BinaryTableGroupScan.java | 216 -------
.../binary/CompareFunctionsProcessor.java | 547 ------------------
.../maprdb/binary/MapRDBFilterBuilder.java | 355 ------------
.../maprdb/json/CompareFunctionsProcessor.java | 222 --------
.../store/maprdb/json/JsonConditionBuilder.java | 240 --------
.../exec/store/maprdb/json/JsonScanSpec.java | 109 ----
.../exec/store/maprdb/json/JsonSubScanSpec.java | 112 ----
.../store/maprdb/json/JsonTableGroupScan.java | 184 ------
.../maprdb/json/MaprDBJsonRecordReader.java | 569 -------------------
.../drill/exec/store/maprdb/util/CommonFns.java | 26 -
.../drill/maprdb/tests/MaprDBTestsSuite.java | 9 +-
.../maprdb/tests/binary/TestMapRDBSimple.java | 2 +-
48 files changed, 4135 insertions(+), 3851 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/TableFormatMatcher.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/TableFormatMatcher.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/TableFormatMatcher.java
new file mode 100644
index 0000000..192e57d
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/TableFormatMatcher.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mapr;
+
+import java.io.IOException;
+
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.planner.logical.DynamicDrillTable;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.dfs.FileSystemPlugin;
+import org.apache.drill.exec.store.dfs.FormatMatcher;
+import org.apache.drill.exec.store.dfs.FormatSelection;
+import org.apache.hadoop.fs.FileStatus;
+
+import com.mapr.fs.MapRFileStatus;
+
+public abstract class TableFormatMatcher extends FormatMatcher {
+
+ private final TableFormatPlugin plugin;
+
+ public TableFormatMatcher(TableFormatPlugin plugin) {
+ this.plugin = plugin;
+ }
+
+ @Override
+ public boolean supportDirectoryReads() {
+ return false;
+ }
+
+ public DrillTable isReadable(DrillFileSystem fs,
+ FileSelection selection, FileSystemPlugin fsPlugin,
+ String storageEngineName, String userName) throws IOException {
+ FileStatus status = selection.getFirstPath(fs);
+ if (!isFileReadable(fs, status)) {
+ return null;
+ }
+
+ return new DynamicDrillTable(fsPlugin, storageEngineName, userName,
+ new FormatSelection(getFormatPlugin().getConfig(), selection));
+ }
+
+ @Override
+ public boolean isFileReadable(DrillFileSystem fs, FileStatus status) throws IOException {
+ return (status instanceof MapRFileStatus)
+ && ((MapRFileStatus) status).isTable()
+ && isSupportedTable((MapRFileStatus) status);
+ }
+
+ @Override
+ public TableFormatPlugin getFormatPlugin() {
+ return plugin;
+ }
+
+ /**
+ * Returns true if the path pointed by the MapRFileStatus is a supported table
+ * by this format plugin. The path must point to a MapR table.
+ */
+ protected abstract boolean isSupportedTable(MapRFileStatus status) throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/TableFormatPlugin.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/TableFormatPlugin.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/TableFormatPlugin.java
new file mode 100644
index 0000000..b0131fd
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/TableFormatPlugin.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mapr;
+
+import static com.mapr.fs.jni.MapRConstants.MAPRFS_PREFIX;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.physical.base.AbstractWriter;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+import org.apache.drill.exec.store.dfs.FileSystemConfig;
+import org.apache.drill.exec.store.dfs.FileSystemPlugin;
+import org.apache.drill.exec.store.dfs.FormatPlugin;
+import org.apache.hadoop.conf.Configuration;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.google.common.collect.ImmutableSet;
+import com.mapr.fs.MapRFileSystem;
+
+public abstract class TableFormatPlugin implements FormatPlugin {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory
+ .getLogger(TableFormatPlugin.class);
+
+ private final FileSystemConfig storageConfig;
+ private final TableFormatPluginConfig config;
+ private final Configuration fsConf;
+ private final DrillbitContext context;
+ private final String name;
+
+ private volatile FileSystemPlugin storagePlugin;
+ private final MapRFileSystem maprfs;
+
+ protected TableFormatPlugin(String name, DrillbitContext context, Configuration fsConf,
+ StoragePluginConfig storageConfig, TableFormatPluginConfig formatConfig) {
+ this.context = context;
+ this.config = formatConfig;
+ this.storageConfig = (FileSystemConfig) storageConfig;
+ this.fsConf = fsConf;
+ this.name = name == null ? "maprdb" : name;
+ try {
+ this.maprfs = new MapRFileSystem();
+ getMaprFS().initialize(new URI(MAPRFS_PREFIX), fsConf);
+ } catch (IOException | URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public boolean supportsRead() {
+ return true;
+ }
+
+ @Override
+ public boolean supportsWrite() {
+ return false;
+ }
+
+ @Override
+ public boolean supportsAutoPartitioning() {
+ return false;
+ }
+
+ public Configuration getFsConf() {
+ return fsConf;
+ }
+
+ @Override
+ public Set<StoragePluginOptimizerRule> getOptimizerRules() {
+ return ImmutableSet.of();
+ }
+
+ @Override
+ public AbstractWriter getWriter(PhysicalOperator child, String location,
+ List<String> partitionColumns) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public FormatPluginConfig getConfig() {
+ return config;
+ }
+
+ @Override
+ public StoragePluginConfig getStorageConfig() {
+ return storageConfig;
+ }
+
+ @Override
+ public DrillbitContext getContext() {
+ return context;
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ public synchronized FileSystemPlugin getStoragePlugin() {
+ if (this.storagePlugin == null) {
+ try {
+ this.storagePlugin = (FileSystemPlugin) (context.getStorage().getPlugin(storageConfig));
+ } catch (ExecutionSetupException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return storagePlugin;
+ }
+
+ @JsonIgnore
+ public MapRFileSystem getMaprFS() {
+ return maprfs;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/TableFormatPluginConfig.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/TableFormatPluginConfig.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/TableFormatPluginConfig.java
new file mode 100644
index 0000000..904cdb9
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/TableFormatPluginConfig.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mapr;
+
+import org.apache.drill.common.logical.FormatPluginConfig;
+
+public abstract class TableFormatPluginConfig implements FormatPluginConfig {
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ } else if (obj == null) {
+ return false;
+ } else if (getClass() != obj.getClass()) {
+ return false;
+ }
+ return impEquals(obj);
+ }
+
+ protected abstract boolean impEquals(Object obj);
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatMatcher.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatMatcher.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatMatcher.java
new file mode 100644
index 0000000..4a5d118
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatMatcher.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mapr.db;
+
+import java.io.IOException;
+
+import org.apache.drill.exec.store.mapr.TableFormatMatcher;
+import org.apache.drill.exec.store.mapr.TableFormatPlugin;
+
+import com.mapr.fs.MapRFileStatus;
+
+public class MapRDBFormatMatcher extends TableFormatMatcher {
+
+ public MapRDBFormatMatcher(TableFormatPlugin plugin) {
+ super(plugin);
+ }
+
+ @Override
+ protected boolean isSupportedTable(MapRFileStatus status) throws IOException {
+ return !getFormatPlugin()
+ .getMaprFS()
+ .getTableProperties(status.getPath())
+ .getAttr()
+ .getIsMarlinTable();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java
new file mode 100644
index 0000000..9fe16e4
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mapr.db;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.dfs.FormatMatcher;
+import org.apache.drill.exec.store.hbase.HBaseScanSpec;
+import org.apache.drill.exec.store.mapr.TableFormatPlugin;
+import org.apache.drill.exec.store.mapr.db.binary.BinaryTableGroupScan;
+import org.apache.drill.exec.store.mapr.db.json.JsonScanSpec;
+import org.apache.drill.exec.store.mapr.db.json.JsonTableGroupScan;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.google.common.collect.ImmutableSet;
+import com.mapr.fs.tables.TableProperties;
+
+public class MapRDBFormatPlugin extends TableFormatPlugin {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBFormatPlugin.class);
+
+ private final MapRDBFormatMatcher matcher;
+
+ public MapRDBFormatPlugin(String name, DrillbitContext context, Configuration fsConf,
+ StoragePluginConfig storageConfig, MapRDBFormatPluginConfig formatConfig) {
+ super(name, context, fsConf, storageConfig, formatConfig);
+ matcher = new MapRDBFormatMatcher(this);
+ }
+
+ @Override
+ public FormatMatcher getMatcher() {
+ return matcher;
+ }
+
+ @Override
+ @JsonIgnore
+ public Set<StoragePluginOptimizerRule> getOptimizerRules() {
+ return ImmutableSet.of(MapRDBPushFilterIntoScan.FILTER_ON_SCAN, MapRDBPushFilterIntoScan.FILTER_ON_PROJECT);
+ }
+
+ @Override
+ public AbstractGroupScan getGroupScan(String userName, FileSelection selection,
+ List<SchemaPath> columns) throws IOException {
+ List<String> files = selection.getFiles();
+ assert (files.size() == 1);
+ String tableName = files.get(0);
+ TableProperties props = getMaprFS().getTableProperties(new Path(tableName));
+
+ if (props.getAttr().getJson()) {
+ JsonScanSpec scanSpec = new JsonScanSpec(tableName, null/*condition*/);
+ return new JsonTableGroupScan(userName, getStoragePlugin(), this, scanSpec, columns);
+ } else {
+ HBaseScanSpec scanSpec = new HBaseScanSpec(tableName);
+ return new BinaryTableGroupScan(userName, getStoragePlugin(), this, scanSpec, columns);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPluginConfig.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPluginConfig.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPluginConfig.java
new file mode 100644
index 0000000..82b360c
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPluginConfig.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mapr.db;
+
+import org.apache.drill.exec.store.mapr.TableFormatPluginConfig;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonInclude.Include;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("maprdb") @JsonInclude(Include.NON_DEFAULT)
+public class MapRDBFormatPluginConfig extends TableFormatPluginConfig {
+
+ private boolean allTextMode = false;
+ private boolean readAllNumbersAsDouble = false;
+
+ @Override
+ public int hashCode() {
+ return 53;
+ }
+
+ @Override
+ protected boolean impEquals(Object obj) {
+ MapRDBFormatPluginConfig other = (MapRDBFormatPluginConfig)obj;
+ if (readAllNumbersAsDouble != other.readAllNumbersAsDouble) {
+ return false;
+ } else if (allTextMode != other.allTextMode) {
+ return false;
+ }
+
+ return true;
+ }
+
+ public boolean isReadAllNumbersAsDouble() {
+ return readAllNumbersAsDouble;
+ }
+
+ public boolean isAllTextMode() {
+ return allTextMode;
+ }
+
+ @JsonProperty("allTextMode")
+ public void setAllTextMode(boolean mode) {
+ allTextMode = mode;
+ }
+
+ @JsonProperty("readAllNumbersAsDouble")
+ public void setReadAllNumbersAsDouble(boolean read) {
+ readAllNumbersAsDouble = read;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBGroupScan.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBGroupScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBGroupScan.java
new file mode 100644
index 0000000..8563b78
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBGroupScan.java
@@ -0,0 +1,283 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mapr.db;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.PriorityQueue;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.dfs.FileSystemConfig;
+import org.apache.drill.exec.store.dfs.FileSystemPlugin;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+public abstract class MapRDBGroupScan extends AbstractGroupScan {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBGroupScan.class);
+
+ private FileSystemPlugin storagePlugin;
+
+ private MapRDBFormatPlugin formatPlugin;
+
+ protected MapRDBFormatPluginConfig formatPluginConfig;
+
+ protected List<SchemaPath> columns;
+
+ protected Map<Integer, List<MapRDBSubScanSpec>> endpointFragmentMapping;
+
+ protected NavigableMap<TabletFragmentInfo, String> regionsToScan;
+
+ private boolean filterPushedDown = false;
+
+ private Stopwatch watch = Stopwatch.createUnstarted();
+
+ private static final Comparator<List<MapRDBSubScanSpec>> LIST_SIZE_COMPARATOR = new Comparator<List<MapRDBSubScanSpec>>() {
+ @Override
+ public int compare(List<MapRDBSubScanSpec> list1, List<MapRDBSubScanSpec> list2) {
+ return list1.size() - list2.size();
+ }
+ };
+
+ private static final Comparator<List<MapRDBSubScanSpec>> LIST_SIZE_COMPARATOR_REV = Collections.reverseOrder(LIST_SIZE_COMPARATOR);
+
+ public MapRDBGroupScan(MapRDBGroupScan that) {
+ super(that);
+ this.columns = that.columns;
+ this.formatPlugin = that.formatPlugin;
+ this.formatPluginConfig = that.formatPluginConfig;
+ this.storagePlugin = that.storagePlugin;
+ this.regionsToScan = that.regionsToScan;
+ this.filterPushedDown = that.filterPushedDown;
+ }
+
+ public MapRDBGroupScan(FileSystemPlugin storagePlugin,
+ MapRDBFormatPlugin formatPlugin, List<SchemaPath> columns, String userName) {
+ super(userName);
+ this.storagePlugin = storagePlugin;
+ this.formatPlugin = formatPlugin;
+ this.formatPluginConfig = (MapRDBFormatPluginConfig)formatPlugin.getConfig();
+ this.columns = columns;
+ }
+
+ @Override
+ public List<EndpointAffinity> getOperatorAffinity() {
+ watch.reset();
+ watch.start();
+ Map<String, DrillbitEndpoint> endpointMap = new HashMap<String, DrillbitEndpoint>();
+ for (DrillbitEndpoint ep : formatPlugin.getContext().getBits()) {
+ endpointMap.put(ep.getAddress(), ep);
+ }
+
+ Map<DrillbitEndpoint, EndpointAffinity> affinityMap = new HashMap<DrillbitEndpoint, EndpointAffinity>();
+ for (String serverName : regionsToScan.values()) {
+ DrillbitEndpoint ep = endpointMap.get(serverName);
+ if (ep != null) {
+ EndpointAffinity affinity = affinityMap.get(ep);
+ if (affinity == null) {
+ affinityMap.put(ep, new EndpointAffinity(ep, 1));
+ } else {
+ affinity.addAffinity(1);
+ }
+ }
+ }
+ logger.debug("Took {} �s to get operator affinity", watch.elapsed(TimeUnit.NANOSECONDS)/1000);
+ return Lists.newArrayList(affinityMap.values());
+ }
+
+ /**
+ *
+ * @param incomingEndpoints
+ */
+ @Override
+ public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) {
+ watch.reset();
+ watch.start();
+
+ final int numSlots = incomingEndpoints.size();
+ Preconditions.checkArgument(numSlots <= regionsToScan.size(),
+ String.format("Incoming endpoints %d is greater than number of scan regions %d", numSlots, regionsToScan.size()));
+
+ /*
+ * Minimum/Maximum number of assignment per slot
+ */
+ final int minPerEndpointSlot = (int) Math.floor((double)regionsToScan.size() / numSlots);
+ final int maxPerEndpointSlot = (int) Math.ceil((double)regionsToScan.size() / numSlots);
+
+ /*
+ * initialize (endpoint index => HBaseSubScanSpec list) map
+ */
+ endpointFragmentMapping = Maps.newHashMapWithExpectedSize(numSlots);
+
+ /*
+ * another map with endpoint (hostname => corresponding index list) in 'incomingEndpoints' list
+ */
+ Map<String, Queue<Integer>> endpointHostIndexListMap = Maps.newHashMap();
+
+ /*
+ * Initialize these two maps
+ */
+ for (int i = 0; i < numSlots; ++i) {
+ endpointFragmentMapping.put(i, new ArrayList<MapRDBSubScanSpec>(maxPerEndpointSlot));
+ String hostname = incomingEndpoints.get(i).getAddress();
+ Queue<Integer> hostIndexQueue = endpointHostIndexListMap.get(hostname);
+ if (hostIndexQueue == null) {
+ hostIndexQueue = Lists.newLinkedList();
+ endpointHostIndexListMap.put(hostname, hostIndexQueue);
+ }
+ hostIndexQueue.add(i);
+ }
+
+ Set<Entry<TabletFragmentInfo, String>> regionsToAssignSet = Sets.newHashSet(regionsToScan.entrySet());
+
+ /*
+ * First, we assign regions which are hosted on region servers running on drillbit endpoints
+ */
+ for (Iterator<Entry<TabletFragmentInfo, String>> regionsIterator = regionsToAssignSet.iterator(); regionsIterator.hasNext(); /*nothing*/) {
+ Entry<TabletFragmentInfo, String> regionEntry = regionsIterator.next();
+ /*
+ * Test if there is a drillbit endpoint which is also an HBase RegionServer that hosts the current HBase region
+ */
+ Queue<Integer> endpointIndexlist = endpointHostIndexListMap.get(regionEntry.getValue());
+ if (endpointIndexlist != null) {
+ Integer slotIndex = endpointIndexlist.poll();
+ List<MapRDBSubScanSpec> endpointSlotScanList = endpointFragmentMapping.get(slotIndex);
+ endpointSlotScanList.add(getSubScanSpec(regionEntry.getKey()));
+ // add to the tail of the slot list, to add more later in round robin fashion
+ endpointIndexlist.offer(slotIndex);
+ // this region has been assigned
+ regionsIterator.remove();
+ }
+ }
+
+ /*
+ * Build priority queues of slots, with ones which has tasks lesser than 'minPerEndpointSlot' and another which have more.
+ */
+ PriorityQueue<List<MapRDBSubScanSpec>> minHeap = new PriorityQueue<List<MapRDBSubScanSpec>>(numSlots, LIST_SIZE_COMPARATOR);
+ PriorityQueue<List<MapRDBSubScanSpec>> maxHeap = new PriorityQueue<List<MapRDBSubScanSpec>>(numSlots, LIST_SIZE_COMPARATOR_REV);
+ for(List<MapRDBSubScanSpec> listOfScan : endpointFragmentMapping.values()) {
+ if (listOfScan.size() < minPerEndpointSlot) {
+ minHeap.offer(listOfScan);
+ } else if (listOfScan.size() > minPerEndpointSlot){
+ maxHeap.offer(listOfScan);
+ }
+ }
+
+ /*
+ * Now, let's process any regions which remain unassigned and assign them to slots with minimum number of assignments.
+ */
+ if (regionsToAssignSet.size() > 0) {
+ for (Entry<TabletFragmentInfo, String> regionEntry : regionsToAssignSet) {
+ List<MapRDBSubScanSpec> smallestList = minHeap.poll();
+ smallestList.add(getSubScanSpec(regionEntry.getKey()));
+ if (smallestList.size() < maxPerEndpointSlot) {
+ minHeap.offer(smallestList);
+ }
+ }
+ }
+
+ /*
+ * While there are slots with lesser than 'minPerEndpointSlot' unit work, balance from those with more.
+ */
+ while(minHeap.peek() != null && minHeap.peek().size() < minPerEndpointSlot) {
+ List<MapRDBSubScanSpec> smallestList = (List<MapRDBSubScanSpec>) minHeap.poll();
+ List<MapRDBSubScanSpec> largestList = (List<MapRDBSubScanSpec>) maxHeap.poll();
+ smallestList.add(largestList.remove(largestList.size()-1));
+ if (largestList.size() > minPerEndpointSlot) {
+ maxHeap.offer(largestList);
+ }
+ if (smallestList.size() < minPerEndpointSlot) {
+ minHeap.offer(smallestList);
+ }
+ }
+
+ /* no slot should be empty at this point */
+ assert (minHeap.peek() == null || minHeap.peek().size() > 0) : String.format(
+ "Unable to assign tasks to some endpoints.\nEndpoints: {}.\nAssignment Map: {}.",
+ incomingEndpoints, endpointFragmentMapping.toString());
+
+ logger.debug("Built assignment map in {} �s.\nEndpoints: {}.\nAssignment Map: {}",
+ watch.elapsed(TimeUnit.NANOSECONDS)/1000, incomingEndpoints, endpointFragmentMapping.toString());
+ }
+
+ @Override
+ public int getMaxParallelizationWidth() {
+ return regionsToScan.size();
+ }
+
+ @JsonIgnore
+ public MapRDBFormatPlugin getFormatPlugin() {
+ return formatPlugin;
+ }
+
+ @Override
+ public String getDigest() {
+ return toString();
+ }
+
+ @JsonProperty("storage")
+ public FileSystemConfig getStorageConfig() {
+ return (FileSystemConfig) storagePlugin.getConfig();
+ }
+
+ @JsonIgnore
+ public FileSystemPlugin getStoragePlugin(){
+ return storagePlugin;
+ }
+
+ @JsonProperty
+ public List<SchemaPath> getColumns() {
+ return columns;
+ }
+
+ @JsonIgnore
+ public boolean canPushdownProjects(List<SchemaPath> columns) {
+ return true;
+ }
+
+ @JsonIgnore
+ public void setFilterPushedDown(boolean b) {
+ this.filterPushedDown = true;
+ }
+
+ @JsonIgnore
+ public boolean isFilterPushedDown() {
+ return filterPushedDown;
+ }
+
+ protected abstract MapRDBSubScanSpec getSubScanSpec(TabletFragmentInfo key);
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushFilterIntoScan.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushFilterIntoScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushFilterIntoScan.java
new file mode 100644
index 0000000..7292182
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushFilterIntoScan.java
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mapr.db;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.exec.planner.logical.DrillOptiq;
+import org.apache.drill.exec.planner.logical.DrillParseContext;
+import org.apache.drill.exec.planner.logical.RelOptHelper;
+import org.apache.drill.exec.planner.physical.FilterPrel;
+import org.apache.drill.exec.planner.physical.PrelUtil;
+import org.apache.drill.exec.planner.physical.ProjectPrel;
+import org.apache.drill.exec.planner.physical.ScanPrel;
+import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+import org.apache.drill.exec.store.hbase.HBaseScanSpec;
+import org.apache.drill.exec.store.mapr.db.binary.BinaryTableGroupScan;
+import org.apache.drill.exec.store.mapr.db.binary.MapRDBFilterBuilder;
+import org.apache.drill.exec.store.mapr.db.json.JsonConditionBuilder;
+import org.apache.drill.exec.store.mapr.db.json.JsonScanSpec;
+import org.apache.drill.exec.store.mapr.db.json.JsonTableGroupScan;
+
+import com.google.common.collect.ImmutableList;
+
+public abstract class MapRDBPushFilterIntoScan extends StoragePluginOptimizerRule {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBPushFilterIntoScan.class);
+
+ private MapRDBPushFilterIntoScan(RelOptRuleOperand operand, String description) {
+ super(operand, description);
+ }
+
+ public static final StoragePluginOptimizerRule FILTER_ON_SCAN = new MapRDBPushFilterIntoScan(RelOptHelper.some(FilterPrel.class, RelOptHelper.any(ScanPrel.class)), "MapRDBPushFilterIntoScan:Filter_On_Scan") {
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ final ScanPrel scan = (ScanPrel) call.rel(1);
+ final FilterPrel filter = (FilterPrel) call.rel(0);
+ final RexNode condition = filter.getCondition();
+
+ if (scan.getGroupScan() instanceof BinaryTableGroupScan) {
+ BinaryTableGroupScan groupScan = (BinaryTableGroupScan)scan.getGroupScan();
+ doPushFilterIntoBinaryGroupScan(call, filter, null, scan, groupScan, condition);
+ } else {
+ assert(scan.getGroupScan() instanceof JsonTableGroupScan);
+ JsonTableGroupScan groupScan = (JsonTableGroupScan)scan.getGroupScan();
+ doPushFilterIntoJsonGroupScan(call, filter, null, scan, groupScan, condition);
+ }
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ final ScanPrel scan = (ScanPrel) call.rel(1);
+ if (scan.getGroupScan() instanceof BinaryTableGroupScan ||
+ scan.getGroupScan() instanceof JsonTableGroupScan) {
+ return super.matches(call);
+ }
+ return false;
+ }
+ };
+
+ public static final StoragePluginOptimizerRule FILTER_ON_PROJECT = new MapRDBPushFilterIntoScan(RelOptHelper.some(FilterPrel.class, RelOptHelper.some(ProjectPrel.class, RelOptHelper.any(ScanPrel.class))), "MapRDBPushFilterIntoScan:Filter_On_Project") {
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ final ScanPrel scan = (ScanPrel) call.rel(2);
+ final ProjectPrel project = (ProjectPrel) call.rel(1);
+ final FilterPrel filter = (FilterPrel) call.rel(0);
+
+ // convert the filter to one that references the child of the project
+ final RexNode condition = RelOptUtil.pushPastProject(filter.getCondition(), project);
+
+ if (scan.getGroupScan() instanceof BinaryTableGroupScan) {
+ BinaryTableGroupScan groupScan = (BinaryTableGroupScan)scan.getGroupScan();
+ doPushFilterIntoBinaryGroupScan(call, filter, project, scan, groupScan, condition);
+ } else {
+ assert(scan.getGroupScan() instanceof JsonTableGroupScan);
+ JsonTableGroupScan groupScan = (JsonTableGroupScan)scan.getGroupScan();
+ doPushFilterIntoJsonGroupScan(call, filter, project, scan, groupScan, condition);
+ }
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ final ScanPrel scan = (ScanPrel) call.rel(2);
+ if (scan.getGroupScan() instanceof BinaryTableGroupScan ||
+ scan.getGroupScan() instanceof JsonTableGroupScan) {
+ return super.matches(call);
+ }
+ return false;
+ }
+ };
+
+ protected void doPushFilterIntoJsonGroupScan(RelOptRuleCall call,
+ FilterPrel filter, final ProjectPrel project, ScanPrel scan,
+ JsonTableGroupScan groupScan, RexNode condition) {
+
+ if (groupScan.isFilterPushedDown()) {
+ /*
+ * The rule can get triggered again due to the transformed "scan => filter" sequence
+ * created by the earlier execution of this rule when we could not do a complete
+ * conversion of Optiq Filter's condition to HBase Filter. In such cases, we rely upon
+ * this flag to not do a re-processing of the rule on the already transformed call.
+ */
+ return;
+ }
+
+ LogicalExpression conditionExp = null;
+ try {
+ conditionExp = DrillOptiq.toDrill(new DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan, condition);
+ } catch (ClassCastException e) {
+ // MD-771 bug in DrillOptiq.toDrill() causes filter condition on ITEM operator to throw ClassCastException
+ // For such cases, we return without pushdown
+ return;
+ }
+ final JsonConditionBuilder jsonConditionBuilder = new JsonConditionBuilder(groupScan, conditionExp);
+ final JsonScanSpec newScanSpec = jsonConditionBuilder.parseTree();
+ if (newScanSpec == null) {
+ return; //no filter pushdown ==> No transformation.
+ }
+
+ final JsonTableGroupScan newGroupsScan = new JsonTableGroupScan(groupScan.getUserName(),
+ groupScan.getStoragePlugin(),
+ groupScan.getFormatPlugin(),
+ newScanSpec,
+ groupScan.getColumns());
+ newGroupsScan.setFilterPushedDown(true);
+
+ final ScanPrel newScanPrel = ScanPrel.create(scan, filter.getTraitSet(), newGroupsScan, scan.getRowType());
+
+ // Depending on whether is a project in the middle, assign either scan or copy of project to childRel.
+ final RelNode childRel = project == null ? newScanPrel : project.copy(project.getTraitSet(), ImmutableList.of((RelNode)newScanPrel));;
+
+ if (jsonConditionBuilder.isAllExpressionsConverted()) {
+ /*
+ * Since we could convert the entire filter condition expression into an HBase filter,
+ * we can eliminate the filter operator altogether.
+ */
+ call.transformTo(childRel);
+ } else {
+ call.transformTo(filter.copy(filter.getTraitSet(), ImmutableList.of(childRel)));
+ }
+ }
+
+ protected void doPushFilterIntoBinaryGroupScan(final RelOptRuleCall call,
+ final FilterPrel filter,
+ final ProjectPrel project,
+ final ScanPrel scan,
+ final BinaryTableGroupScan groupScan,
+ final RexNode condition) {
+
+ if (groupScan.isFilterPushedDown()) {
+ /*
+ * The rule can get triggered again due to the transformed "scan => filter" sequence
+ * created by the earlier execution of this rule when we could not do a complete
+ * conversion of Optiq Filter's condition to HBase Filter. In such cases, we rely upon
+ * this flag to not do a re-processing of the rule on the already transformed call.
+ */
+ return;
+ }
+
+ final LogicalExpression conditionExp = DrillOptiq.toDrill(new DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan, condition);
+ final MapRDBFilterBuilder maprdbFilterBuilder = new MapRDBFilterBuilder(groupScan, conditionExp);
+ final HBaseScanSpec newScanSpec = maprdbFilterBuilder.parseTree();
+ if (newScanSpec == null) {
+ return; //no filter pushdown ==> No transformation.
+ }
+
+ final BinaryTableGroupScan newGroupsScan = new BinaryTableGroupScan(groupScan.getUserName(), groupScan.getStoragePlugin(),
+ groupScan.getFormatPlugin(), newScanSpec, groupScan.getColumns());
+ newGroupsScan.setFilterPushedDown(true);
+
+ final ScanPrel newScanPrel = ScanPrel.create(scan, filter.getTraitSet(), newGroupsScan, scan.getRowType());
+
+ // Depending on whether is a project in the middle, assign either scan or copy of project to childRel.
+ final RelNode childRel = project == null ? newScanPrel : project.copy(project.getTraitSet(), ImmutableList.of((RelNode)newScanPrel));;
+
+ if (maprdbFilterBuilder.isAllExpressionsConverted()) {
+ /*
+ * Since we could convert the entire filter condition expression into an HBase filter,
+ * we can eliminate the filter operator altogether.
+ */
+ call.transformTo(childRel);
+ } else {
+ call.transformTo(filter.copy(filter.getTraitSet(), ImmutableList.of(childRel)));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBScanBatchCreator.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBScanBatchCreator.java
new file mode 100644
index 0000000..1d51223
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBScanBatchCreator.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mapr.db;
+
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.hbase.HBaseRecordReader;
+import org.apache.drill.exec.store.hbase.HBaseSubScan.HBaseSubScanSpec;
+import org.apache.drill.exec.store.mapr.db.binary.BinaryTableGroupScan;
+import org.apache.drill.exec.store.mapr.db.json.MaprDBJsonRecordReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+public class MapRDBScanBatchCreator implements BatchCreator<MapRDBSubScan>{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBScanBatchCreator.class);
+
+ @Override
+ public ScanBatch getBatch(FragmentContext context, MapRDBSubScan subScan, List<RecordBatch> children) throws ExecutionSetupException {
+ Preconditions.checkArgument(children.isEmpty());
+ List<RecordReader> readers = Lists.newArrayList();
+ Configuration conf = HBaseConfiguration.create();
+ for(MapRDBSubScanSpec scanSpec : subScan.getRegionScanSpecList()){
+ try {
+ if (BinaryTableGroupScan.TABLE_BINARY.equals(subScan.getTableType())) {
+ readers.add(new HBaseRecordReader(conf, getHBaseSubScanSpec(scanSpec), subScan.getColumns(), context));
+ } else {
+ readers.add(new MaprDBJsonRecordReader(scanSpec, subScan.getFormatPluginConfig(), subScan.getColumns(), context));
+ }
+ } catch (Exception e1) {
+ throw new ExecutionSetupException(e1);
+ }
+ }
+ return new ScanBatch(subScan, context, readers.iterator());
+ }
+
+ private HBaseSubScanSpec getHBaseSubScanSpec(MapRDBSubScanSpec scanSpec) {
+ return new HBaseSubScanSpec(scanSpec.getTableName(), scanSpec.getRegionServer(),
+ scanSpec.getStartRow(), scanSpec.getStopRow(), scanSpec.getSerializedFilter(), null);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBSubScan.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBSubScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBSubScan.java
new file mode 100644
index 0000000..dea6867
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBSubScan.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mapr.db;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.dfs.FileSystemPlugin;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+
+// Class containing information for reading a single HBase region
+@JsonTypeName("maprdb-sub-scan")
+public class MapRDBSubScan extends AbstractBase implements SubScan {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBSubScan.class);
+
+ @JsonProperty
+ public final StoragePluginConfig storage;
+ @JsonIgnore
+ private final MapRDBFormatPluginConfig fsFormatPluginConfig;
+ private final FileSystemPlugin fsStoragePlugin;
+ private final List<MapRDBSubScanSpec> regionScanSpecList;
+ private final List<SchemaPath> columns;
+ private final String tableType;
+
+ @JsonCreator
+ public MapRDBSubScan(@JacksonInject StoragePluginRegistry registry,
+ @JsonProperty("userName") String userName,
+ @JsonProperty("formatPluginConfig") MapRDBFormatPluginConfig formatPluginConfig,
+ @JsonProperty("storage") StoragePluginConfig storage,
+ @JsonProperty("regionScanSpecList") List<MapRDBSubScanSpec> regionScanSpecList,
+ @JsonProperty("columns") List<SchemaPath> columns,
+ @JsonProperty("tableType") String tableType) throws ExecutionSetupException {
+ super(userName);
+ this.fsFormatPluginConfig = formatPluginConfig;
+ this.fsStoragePlugin = (FileSystemPlugin) registry.getPlugin(storage);
+ this.regionScanSpecList = regionScanSpecList;
+ this.storage = storage;
+ this.columns = columns;
+ this.tableType = tableType;
+ }
+
+ public MapRDBSubScan(String userName, MapRDBFormatPluginConfig formatPluginConfig, FileSystemPlugin storagePlugin, StoragePluginConfig config,
+ List<MapRDBSubScanSpec> maprSubScanSpecs, List<SchemaPath> columns, String tableType) {
+ super(userName);
+ fsFormatPluginConfig = formatPluginConfig;
+ fsStoragePlugin = storagePlugin;
+ storage = config;
+ this.regionScanSpecList = maprSubScanSpecs;
+ this.columns = columns;
+ this.tableType = tableType;
+ }
+
+ public List<MapRDBSubScanSpec> getRegionScanSpecList() {
+ return regionScanSpecList;
+ }
+
+ public List<SchemaPath> getColumns() {
+ return columns;
+ }
+
+ @Override
+ public boolean isExecutable() {
+ return false;
+ }
+
+ @Override
+ public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+ return physicalVisitor.visitSubScan(this, value);
+ }
+
+ @Override
+ public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+ Preconditions.checkArgument(children.isEmpty());
+ return new MapRDBSubScan(getUserName(), fsFormatPluginConfig, fsStoragePlugin, storage, regionScanSpecList, columns, tableType);
+ }
+
+ @Override
+ public Iterator<PhysicalOperator> iterator() {
+ return ImmutableSet.<PhysicalOperator>of().iterator();
+ }
+
+ @Override
+ public int getOperatorType() {
+ return 1001;
+ }
+
+ public String getTableType() {
+ return tableType;
+ }
+
+ public MapRDBFormatPluginConfig getFormatPluginConfig() {
+ return fsFormatPluginConfig;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBSubScanSpec.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBSubScanSpec.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBSubScanSpec.java
new file mode 100644
index 0000000..3ffe47c
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBSubScanSpec.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mapr.db;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.mapr.fs.jni.MapRConstants;
+import com.mapr.org.apache.hadoop.hbase.util.Bytes;
+
+public class MapRDBSubScanSpec {
+
+ protected String tableName;
+ protected String regionServer;
+ protected byte[] startRow;
+ protected byte[] stopRow;
+ protected byte[] serializedFilter;
+
+ @JsonCreator
+ public MapRDBSubScanSpec(@JsonProperty("tableName") String tableName,
+ @JsonProperty("regionServer") String regionServer,
+ @JsonProperty("startRow") byte[] startRow,
+ @JsonProperty("stopRow") byte[] stopRow,
+ @JsonProperty("serializedFilter") byte[] serializedFilter,
+ @JsonProperty("filterString") String filterString) {
+ if (serializedFilter != null && filterString != null) {
+ throw new IllegalArgumentException("The parameters 'serializedFilter' or 'filterString' cannot be specified at the same time.");
+ }
+ this.tableName = tableName;
+ this.regionServer = regionServer;
+ this.startRow = startRow;
+ this.stopRow = stopRow;
+ this.serializedFilter = serializedFilter;
+ }
+
+ /* package */ MapRDBSubScanSpec() {
+ // empty constructor, to be used with builder pattern;
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public MapRDBSubScanSpec setTableName(String tableName) {
+ this.tableName = tableName;
+ return this;
+ }
+
+ public String getRegionServer() {
+ return regionServer;
+ }
+
+ public MapRDBSubScanSpec setRegionServer(String regionServer) {
+ this.regionServer = regionServer;
+ return this;
+ }
+
+ /**
+ * @return the raw (not-encoded) start row key for this sub-scan
+ */
+ public byte[] getStartRow() {
+ return startRow == null ? MapRConstants.EMPTY_BYTE_ARRAY: startRow;
+ }
+
+ public MapRDBSubScanSpec setStartRow(byte[] startRow) {
+ this.startRow = startRow;
+ return this;
+ }
+
+ /**
+ * @return the raw (not-encoded) stop row key for this sub-scan
+ */
+ public byte[] getStopRow() {
+ return stopRow == null ? MapRConstants.EMPTY_BYTE_ARRAY : stopRow;
+ }
+
+ public MapRDBSubScanSpec setStopRow(byte[] stopRow) {
+ this.stopRow = stopRow;
+ return this;
+ }
+
+ public byte[] getSerializedFilter() {
+ return serializedFilter;
+ }
+
+ public MapRDBSubScanSpec setSerializedFilter(byte[] serializedFilter) {
+ this.serializedFilter = serializedFilter;
+ return this;
+ }
+
+ @Override
+ public String toString() {
+ return "MapRDBSubScanSpec [tableName=" + tableName
+ + ", startRow=" + (startRow == null ? null : Bytes.toStringBinary(startRow))
+ + ", stopRow=" + (stopRow == null ? null : Bytes.toStringBinary(stopRow))
+ + ", filter=" + (getSerializedFilter() == null ? null : Bytes.toBase64(getSerializedFilter()))
+ + ", regionServer=" + regionServer + "]";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBTableStats.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBTableStats.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBTableStats.java
new file mode 100644
index 0000000..162776c
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBTableStats.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mapr.db;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.mapr.TableMappingRulesFactory;
+
+import com.mapr.fs.hbase.HBaseAdminImpl;
+
+public class MapRDBTableStats {
+ private static volatile HBaseAdminImpl admin = null;
+
+ private long numRows;
+
+ public MapRDBTableStats(Configuration conf, String tablePath) throws Exception {
+ if (admin == null) {
+ synchronized (MapRDBTableStats.class) {
+ if (admin == null) {
+ Configuration config = conf;
+ admin = new HBaseAdminImpl(config, TableMappingRulesFactory.create(conf));
+ }
+ }
+ }
+ numRows = admin.getNumRows(tablePath);
+ }
+
+ public long getNumRows() {
+ return numRows;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/TabletFragmentInfo.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/TabletFragmentInfo.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/TabletFragmentInfo.java
new file mode 100644
index 0000000..e71c67c
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/TabletFragmentInfo.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mapr.db;
+
+import org.apache.hadoop.hbase.HRegionInfo;
+
+import com.mapr.db.impl.TabletInfoImpl;
+
+public class TabletFragmentInfo implements Comparable<TabletFragmentInfo> {
+
+ final private HRegionInfo regionInfo;
+ final private TabletInfoImpl tabletInfoImpl;
+
+ public TabletFragmentInfo(HRegionInfo regionInfo) {
+ this(null, regionInfo);
+ }
+
+ public TabletFragmentInfo(TabletInfoImpl tabletInfoImpl) {
+ this(tabletInfoImpl, null);
+ }
+
+ TabletFragmentInfo(TabletInfoImpl tabletInfoImpl, HRegionInfo regionInfo) {
+ this.regionInfo = regionInfo;
+ this.tabletInfoImpl = tabletInfoImpl;
+ }
+
+ public HRegionInfo getRegionInfo() {
+ return regionInfo;
+ }
+
+ public TabletInfoImpl getTabletInfoImpl() {
+ return tabletInfoImpl;
+ }
+
+ public boolean containsRow(byte[] row) {
+ return tabletInfoImpl != null ? tabletInfoImpl.containsRow(row) :
+ regionInfo.containsRow(row);
+ }
+
+ public byte[] getStartKey() {
+ return tabletInfoImpl != null ? tabletInfoImpl.getStartRow() :
+ regionInfo.getStartKey();
+ }
+
+ public byte[] getEndKey() {
+ return tabletInfoImpl != null ? tabletInfoImpl.getStopRow() :
+ regionInfo.getEndKey();
+ }
+
+ @Override
+ public int compareTo(TabletFragmentInfo o) {
+ return tabletInfoImpl != null ? tabletInfoImpl.compareTo(o.tabletInfoImpl) :
+ regionInfo.compareTo(o.regionInfo);
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((regionInfo == null) ? 0 : regionInfo.hashCode());
+ result = prime * result + ((tabletInfoImpl == null) ? 0 : tabletInfoImpl.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ TabletFragmentInfo other = (TabletFragmentInfo) obj;
+ if (regionInfo == null) {
+ if (other.regionInfo != null)
+ return false;
+ } else if (!regionInfo.equals(other.regionInfo))
+ return false;
+ if (tabletInfoImpl == null) {
+ if (other.tabletInfoImpl != null)
+ return false;
+ } else if (!tabletInfoImpl.equals(other.tabletInfoImpl))
+ return false;
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "TabletFragmentInfo [regionInfo=" + regionInfo + ", tabletInfoImpl=" + tabletInfoImpl
+ + "]";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/BinaryTableGroupScan.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/BinaryTableGroupScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/BinaryTableGroupScan.java
new file mode 100644
index 0000000..a597995
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/BinaryTableGroupScan.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mapr.db.binary;
+
+import static org.apache.drill.exec.store.mapr.db.util.CommonFns.isNullOrEmpty;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.dfs.FileSystemConfig;
+import org.apache.drill.exec.store.dfs.FileSystemPlugin;
+import org.apache.drill.exec.store.hbase.DrillHBaseConstants;
+import org.apache.drill.exec.store.hbase.HBaseScanSpec;
+import org.apache.drill.exec.store.mapr.db.MapRDBFormatPlugin;
+import org.apache.drill.exec.store.mapr.db.MapRDBFormatPluginConfig;
+import org.apache.drill.exec.store.mapr.db.MapRDBGroupScan;
+import org.apache.drill.exec.store.mapr.db.MapRDBSubScan;
+import org.apache.drill.exec.store.mapr.db.MapRDBSubScanSpec;
+import org.apache.drill.exec.store.mapr.db.MapRDBTableStats;
+import org.apache.drill.exec.store.mapr.db.TabletFragmentInfo;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.codehaus.jackson.annotate.JsonCreator;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+
+@JsonTypeName("maprdb-binary-scan")
+public class BinaryTableGroupScan extends MapRDBGroupScan implements DrillHBaseConstants {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BinaryTableGroupScan.class);
+
+ public static final String TABLE_BINARY = "binary";
+
+ private HBaseScanSpec hbaseScanSpec;
+
+ private HTableDescriptor hTableDesc;
+
+ private MapRDBTableStats tableStats;
+
+ @JsonCreator
+ public BinaryTableGroupScan(@JsonProperty("userName") final String userName,
+ @JsonProperty("hbaseScanSpec") HBaseScanSpec scanSpec,
+ @JsonProperty("storage") FileSystemConfig storagePluginConfig,
+ @JsonProperty("format") MapRDBFormatPluginConfig formatPluginConfig,
+ @JsonProperty("columns") List<SchemaPath> columns,
+ @JacksonInject StoragePluginRegistry pluginRegistry) throws IOException, ExecutionSetupException {
+ this (userName,
+ (FileSystemPlugin) pluginRegistry.getPlugin(storagePluginConfig),
+ (MapRDBFormatPlugin) pluginRegistry.getFormatPlugin(storagePluginConfig, formatPluginConfig),
+ scanSpec, columns);
+ }
+
+ public BinaryTableGroupScan(String userName, FileSystemPlugin storagePlugin,
+ MapRDBFormatPlugin formatPlugin, HBaseScanSpec scanSpec, List<SchemaPath> columns) {
+ super(storagePlugin, formatPlugin, columns, userName);
+ this.hbaseScanSpec = scanSpec;
+ init();
+ }
+
+ /**
+ * Private constructor, used for cloning.
+ * @param that The HBaseGroupScan to clone
+ */
+ private BinaryTableGroupScan(BinaryTableGroupScan that) {
+ super(that);
+ this.hbaseScanSpec = that.hbaseScanSpec;
+ this.endpointFragmentMapping = that.endpointFragmentMapping;
+ this.hTableDesc = that.hTableDesc;
+ this.tableStats = that.tableStats;
+ }
+
+ @Override
+ public GroupScan clone(List<SchemaPath> columns) {
+ BinaryTableGroupScan newScan = new BinaryTableGroupScan(this);
+ newScan.columns = columns;
+ newScan.verifyColumns();
+ return newScan;
+ }
+
+ private void init() {
+ logger.debug("Getting region locations");
+ try {
+ Configuration conf = HBaseConfiguration.create();
+ HTable table = new HTable(conf, hbaseScanSpec.getTableName());
+ tableStats = new MapRDBTableStats(conf, hbaseScanSpec.getTableName());
+ this.hTableDesc = table.getTableDescriptor();
+ NavigableMap<HRegionInfo, ServerName> regionsMap = table.getRegionLocations();
+ table.close();
+
+ boolean foundStartRegion = false;
+ regionsToScan = new TreeMap<TabletFragmentInfo, String>();
+ for (Entry<HRegionInfo, ServerName> mapEntry : regionsMap.entrySet()) {
+ HRegionInfo regionInfo = mapEntry.getKey();
+ if (!foundStartRegion && hbaseScanSpec.getStartRow() != null && hbaseScanSpec.getStartRow().length != 0 && !regionInfo.containsRow(hbaseScanSpec.getStartRow())) {
+ continue;
+ }
+ foundStartRegion = true;
+ regionsToScan.put(new TabletFragmentInfo(regionInfo), mapEntry.getValue().getHostname());
+ if (hbaseScanSpec.getStopRow() != null && hbaseScanSpec.getStopRow().length != 0 && regionInfo.containsRow(hbaseScanSpec.getStopRow())) {
+ break;
+ }
+ }
+ } catch (Exception e) {
+ throw new DrillRuntimeException("Error getting region info for table: " + hbaseScanSpec.getTableName(), e);
+ }
+ verifyColumns();
+ }
+
+ private void verifyColumns() {
+ /*
+ if (columns != null) {
+ for (SchemaPath column : columns) {
+ if (!(column.equals(ROW_KEY_PATH) || hTableDesc.hasFamily(HBaseUtils.getBytes(column.getRootSegment().getPath())))) {
+ DrillRuntimeException.format("The column family '%s' does not exist in HBase table: %s .",
+ column.getRootSegment().getPath(), hTableDesc.getNameAsString());
+ }
+ }
+ }
+ */
+ }
+
+ protected MapRDBSubScanSpec getSubScanSpec(TabletFragmentInfo tfi) {
+ HBaseScanSpec spec = hbaseScanSpec;
+ MapRDBSubScanSpec subScanSpec = new MapRDBSubScanSpec(
+ spec.getTableName(),
+ regionsToScan.get(tfi),
+ (!isNullOrEmpty(spec.getStartRow()) && tfi.containsRow(spec.getStartRow())) ? spec.getStartRow() : tfi.getStartKey(),
+ (!isNullOrEmpty(spec.getStopRow()) && tfi.containsRow(spec.getStopRow())) ? spec.getStopRow() : tfi.getEndKey(),
+ spec.getSerializedFilter(),
+ null);
+ return subScanSpec;
+ }
+
+ @Override
+ public MapRDBSubScan getSpecificScan(int minorFragmentId) {
+ assert minorFragmentId < endpointFragmentMapping.size() : String.format(
+ "Mappings length [%d] should be greater than minor fragment id [%d] but it isn't.", endpointFragmentMapping.size(),
+ minorFragmentId);
+ return new MapRDBSubScan(getUserName(), formatPluginConfig, getStoragePlugin(), getStoragePlugin().getConfig(),
+ endpointFragmentMapping.get(minorFragmentId), columns, TABLE_BINARY);
+ }
+
+ @Override
+ public ScanStats getScanStats() {
+ //TODO: look at stats for this.
+ long rowCount = (long) ((hbaseScanSpec.getFilter() != null ? .5 : 1) * tableStats.getNumRows());
+ int avgColumnSize = 10;
+ int numColumns = (columns == null || columns.isEmpty()) ? 100 : columns.size();
+ return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, rowCount, 1, avgColumnSize * numColumns * rowCount);
+ }
+
+ @Override
+ @JsonIgnore
+ public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+ Preconditions.checkArgument(children.isEmpty());
+ return new BinaryTableGroupScan(this);
+ }
+
+ @JsonIgnore
+ public Configuration getHBaseConf() {
+ return HBaseConfiguration.create();
+ }
+
+ @JsonIgnore
+ public String getTableName() {
+ return getHBaseScanSpec().getTableName();
+ }
+
+ @Override
+ public String toString() {
+ return "BinaryTableGroupScan [ScanSpec="
+ + hbaseScanSpec + ", columns="
+ + columns + "]";
+ }
+
+ @JsonProperty
+ public HBaseScanSpec getHBaseScanSpec() {
+ return hbaseScanSpec;
+ }
+
+}