You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ad...@apache.org on 2016/09/13 01:32:24 UTC
[37/50] [abbrv] drill git commit: Refactoring code for better
organization.
http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
new file mode 100644
index 0000000..7fbcd1b
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
@@ -0,0 +1,569 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mapr.db.json;
+
+import static org.ojai.DocumentConstants.ID_KEY;
+import io.netty.buffer.DrillBuf;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.Stack;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.mapr.db.MapRDBFormatPluginConfig;
+import org.apache.drill.exec.store.mapr.db.MapRDBSubScanSpec;
+import org.apache.drill.exec.vector.BaseValueVector;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter.ListWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter;
+import org.apache.drill.exec.vector.complex.writer.VarBinaryWriter;
+import org.apache.drill.exec.vector.complex.writer.VarCharWriter;
+import org.ojai.DocumentReader;
+import org.ojai.DocumentReader.EventType;
+import org.ojai.DocumentStream;
+import org.ojai.FieldPath;
+import org.ojai.FieldSegment;
+import org.ojai.Value;
+import org.ojai.store.QueryCondition;
+import org.ojai.types.OTime;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Sets;
+import com.mapr.db.MapRDB;
+import com.mapr.db.Table;
+import com.mapr.db.Table.TableOption;
+import com.mapr.db.exceptions.DBException;
+import com.mapr.db.impl.IdCodec;
+import com.mapr.db.ojai.DBDocumentReaderBase;
+import com.mapr.db.util.ByteBufs;
+import com.mapr.org.apache.hadoop.hbase.util.Bytes;
+
+public class MaprDBJsonRecordReader extends AbstractRecordReader {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MaprDBJsonRecordReader.class);
+
+ public static final SchemaPath ID_PATH = SchemaPath.getSimplePath(ID_KEY);
+
+ private Table table;
+ private QueryCondition condition;
+ private FieldPath[] projectedFields;
+
+ private final String tableName;
+ private OperatorContext operatorContext;
+ private VectorContainerWriter writer;
+
+ private DrillBuf buffer;
+
+ private DocumentStream documentStream;
+
+ private Iterator<DocumentReader> documentReaderIterators;
+
+ private boolean includeId;
+ private boolean idOnly;
+ private final boolean unionEnabled;
+ private final boolean readNumbersAsDouble;
+ private final boolean allTextMode;
+ private final long MILLISECONDS_IN_A_DAY = (long)1000 * 60 * 60 * 24;
+
+ public MaprDBJsonRecordReader(MapRDBSubScanSpec subScanSpec,
+ MapRDBFormatPluginConfig formatPluginConfig,
+ List<SchemaPath> projectedColumns, FragmentContext context) {
+ buffer = context.getManagedBuffer();
+ projectedFields = null;
+ tableName = Preconditions.checkNotNull(subScanSpec, "MapRDB reader needs a sub-scan spec").getTableName();
+ documentReaderIterators = null;
+ includeId = false;
+ idOnly = false;
+ byte[] serializedFilter = subScanSpec.getSerializedFilter();
+ condition = null;
+
+ if (serializedFilter != null) {
+ condition = com.mapr.db.impl.ConditionImpl.parseFrom(ByteBufs.wrap(serializedFilter));
+ }
+
+ setColumns(projectedColumns);
+ unionEnabled = context.getOptions().getOption(ExecConstants.ENABLE_UNION_TYPE);
+ readNumbersAsDouble = formatPluginConfig.isReadAllNumbersAsDouble();
+ allTextMode = formatPluginConfig.isAllTextMode();
+ }
+
+ @Override
+ protected Collection<SchemaPath> transformColumns(Collection<SchemaPath> columns) {
+ Set<SchemaPath> transformed = Sets.newLinkedHashSet();
+ if (!isStarQuery()) {
+ Set<FieldPath> projectedFieldsSet = Sets.newTreeSet();
+ for (SchemaPath column : columns) {
+ if (column.getRootSegment().getPath().equalsIgnoreCase(ID_KEY)) {
+ /*
+ * we do not include _id field in the set of projected fields
+ * because the DB currently can not return a document if only
+ * the _id field was projected. This should really be fixed in
+ * the DB client (Bug 21708) to avoid transferring the entire
+ * document when only _id is requested.
+ */
+ // projectedFieldsList.add(ID_FIELD);
+ includeId = true;
+ } else {
+ projectedFieldsSet.add(getFieldPathForProjection(column));
+ }
+ transformed.add(column);
+ }
+ if (projectedFieldsSet.size() > 0) {
+ projectedFields = projectedFieldsSet.toArray(new FieldPath[projectedFieldsSet.size()]);
+ }
+ } else {
+ transformed.add(AbstractRecordReader.STAR_COLUMN);
+ includeId = true;
+ }
+
+ /*
+ * (Bug 21708) if we are projecting only the id field, save that condition here.
+ */
+ idOnly = !isStarQuery() && (projectedFields == null);
+ return transformed;
+ }
+
+ @Override
+ public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException {
+ this.writer = new VectorContainerWriter(output, unionEnabled);
+ this.operatorContext = context;
+
+ try {
+ table = MapRDB.getTable(tableName);
+ table.setOption(TableOption.EXCLUDEID, !includeId);
+ documentStream = table.find(condition, projectedFields);
+ documentReaderIterators = documentStream.documentReaders().iterator();
+ } catch (DBException e) {
+ throw new ExecutionSetupException(e);
+ }
+ }
+
+ @Override
+ public int next() {
+ Stopwatch watch = Stopwatch.createUnstarted();
+ watch.start();
+
+ writer.allocate();
+ writer.reset();
+
+ int recordCount = 0;
+ DBDocumentReaderBase reader = null;
+
+ while(recordCount < BaseValueVector.INITIAL_VALUE_ALLOCATION) {
+ try {
+ reader = nextDocumentReader();
+ if (reader == null) break;
+ writer.setPosition(recordCount);
+ if (idOnly) {
+ Value id = reader.getId();
+ MapWriter map = writer.rootAsMap();
+
+ try {
+ switch(id.getType()) {
+ case STRING:
+ writeString(map.varChar(ID_KEY), id.getString());
+ recordCount++;
+ break;
+ case BINARY:
+ if (allTextMode) {
+ writeString(map.varChar(ID_KEY), new String(id.getBinary().array(), Charset.forName("UTF-8")));
+ } else {
+ writeBinary(map.varBinary(ID_KEY), id.getBinary());
+ }
+ recordCount++;
+ break;
+ default:
+ throw new UnsupportedOperationException(id.getType() +
+ " is not a supported type for _id field.");
+ }
+ } catch (IllegalStateException | IllegalArgumentException e) {
+ logger.warn(String.format("Possible schema change at _id: '%s'",
+ IdCodec.asString(id)), e);
+ }
+ } else {
+ if (reader.next() != EventType.START_MAP) {
+ throw dataReadError("The document did not start with START_MAP!");
+ }
+ writeToMap(reader, writer.rootAsMap());
+ recordCount++;
+ }
+ } catch (UserException e) {
+ throw UserException.unsupportedError(e)
+ .addContext(String.format("Table: %s, document id: '%s'",
+ table.getPath(),
+ reader == null ? null : IdCodec.asString(reader.getId())))
+ .build(logger);
+ }
+ }
+
+ writer.setValueCount(recordCount);
+ logger.debug("Took {} ms to get {} records", watch.elapsed(TimeUnit.MILLISECONDS), recordCount);
+ return recordCount;
+ }
+
+ private void writeToMap(DBDocumentReaderBase reader, MapWriter map) {
+ map.start();
+ outside: while (true) {
+ EventType event = reader.next();
+ if (event == null || event == EventType.END_MAP) break outside;
+
+ String fieldName = reader.getFieldName();
+ try {
+ switch (event) {
+ case NULL:
+ break; // not setting the field will leave it as null
+ case BINARY:
+ if (allTextMode) {
+ writeString(map.varChar(fieldName), new String(reader.getBinary().array(), Charset.forName("UTF-8")));
+ } else {
+ writeBinary(map.varBinary(fieldName), reader.getBinary());
+ }
+ break;
+ case BOOLEAN:
+ if (allTextMode) {
+ writeString(map.varChar(fieldName), String.valueOf(reader.getBoolean()));
+ } else {
+ map.bit(fieldName).writeBit(reader.getBoolean() ? 1 : 0);
+ }
+ break;
+ case STRING:
+ writeString(map.varChar(fieldName), reader.getString());
+ break;
+ case BYTE:
+ if (allTextMode) {
+ writeString(map.varChar(fieldName), String.valueOf(reader.getByte()));
+ } else if (readNumbersAsDouble) {
+ map.float8(fieldName).writeFloat8(reader.getByte());
+ } else {
+ map.tinyInt(fieldName).writeTinyInt(reader.getByte());
+ }
+ break;
+ case SHORT:
+ if (allTextMode) {
+ writeString(map.varChar(fieldName), String.valueOf(reader.getShort()));
+ } else if (readNumbersAsDouble) {
+ map.float8(fieldName).writeFloat8(reader.getShort());
+ } else {
+ map.smallInt(fieldName).writeSmallInt(reader.getShort());
+ }
+ break;
+ case INT:
+ if (allTextMode) {
+ writeString(map.varChar(fieldName), String.valueOf(reader.getInt()));
+ } else if (readNumbersAsDouble) {
+ map.float8(fieldName).writeFloat8(reader.getInt());
+ } else {
+ map.integer(fieldName).writeInt(reader.getInt());
+ }
+ break;
+ case LONG:
+ if (allTextMode) {
+ writeString(map.varChar(fieldName), String.valueOf(reader.getLong()));
+ } else if (readNumbersAsDouble) {
+ map.float8(fieldName).writeFloat8(reader.getLong());
+ } else {
+ map.bigInt(fieldName).writeBigInt(reader.getLong());
+ }
+ break;
+ case FLOAT:
+ if (allTextMode) {
+ writeString(map.varChar(fieldName), String.valueOf(reader.getFloat()));
+ } else if (readNumbersAsDouble) {
+ map.float8(fieldName).writeFloat8(reader.getFloat());
+ } else {
+ map.float4(fieldName).writeFloat4(reader.getFloat());
+ }
+ break;
+ case DOUBLE:
+ if (allTextMode) {
+ writeString(map.varChar(fieldName), String.valueOf(reader.getDouble()));
+ } else {
+ map.float8(fieldName).writeFloat8(reader.getDouble());
+ }
+ break;
+ case DECIMAL:
+ throw unsupportedError("Decimal type is currently not supported.");
+ case DATE:
+ if (allTextMode) {
+ writeString(map.varChar(fieldName), reader.getDate().toString());
+ } else {
+
+ long milliSecondsSinceEpoch = reader.getDate().toDaysSinceEpoch() * MILLISECONDS_IN_A_DAY;
+ map.date(fieldName).writeDate(milliSecondsSinceEpoch);
+ }
+ break;
+ case TIME:
+ if (allTextMode) {
+ writeString(map.varChar(fieldName), reader.getTime().toString());
+ } else {
+ OTime t = reader.getTime();
+ int h = t.getHour();
+ int m = t.getMinute();
+ int s = t.getSecond();
+ int ms = t.getMilliSecond();
+ int millisOfDay = ms + (s + ((m + (h * 60)) * 60)) * 1000;
+ map.time(fieldName).writeTime(millisOfDay);
+ }
+ break;
+ case TIMESTAMP:
+ if (allTextMode) {
+ writeString(map.varChar(fieldName), reader.getTimestamp().toString());
+ } else {
+ map.timeStamp(fieldName).writeTimeStamp(reader.getTimestampLong());
+ }
+ break;
+ case INTERVAL:
+ throw unsupportedError("Interval type is currently not supported.");
+ case START_MAP:
+ writeToMap(reader, map.map(fieldName));
+ break;
+ case START_ARRAY:
+ writeToList(reader, map.list(fieldName));
+ break;
+ case END_ARRAY:
+ throw dataReadError("Encountered an END_ARRAY event inside a map.");
+ default:
+ throw unsupportedError("Unsupported type: %s encountered during the query.", event);
+ }
+ } catch (IllegalStateException | IllegalArgumentException e) {
+ logger.warn(String.format("Possible schema change at _id: '%s', field: '%s'",
+ IdCodec.asString(reader.getId()), fieldName), e);
+ }
+ }
+ map.end();
+ }
+
+ private void writeToList(DBDocumentReaderBase reader, ListWriter list) {
+ list.startList();
+ outside: while (true) {
+ EventType event = reader.next();
+ if (event == null || event == EventType.END_ARRAY) break outside;
+
+ switch (event) {
+ case NULL:
+ throw unsupportedError("Null values are not supported in lists.");
+ case BINARY:
+ if (allTextMode) {
+ writeString(list.varChar(), new String(reader.getBinary().array(), Charset.forName("UTF-8")));
+ } else {
+ writeBinary(list.varBinary(), reader.getBinary());
+ }
+ break;
+ case BOOLEAN:
+ if (allTextMode) {
+ writeString(list.varChar(), String.valueOf(reader.getBoolean()));
+ } else {
+ list.bit().writeBit(reader.getBoolean() ? 1 : 0);
+ }
+ break;
+ case STRING:
+ writeString(list.varChar(), reader.getString());
+ break;
+ case BYTE:
+ if (allTextMode) {
+ writeString(list.varChar(), String.valueOf(reader.getByte()));
+ } else if (readNumbersAsDouble) {
+ list.float8().writeFloat8(reader.getByte());
+ } else {
+ list.tinyInt().writeTinyInt(reader.getByte());
+ }
+ break;
+ case SHORT:
+ if (allTextMode) {
+ writeString(list.varChar(), String.valueOf(reader.getShort()));
+ } else if (readNumbersAsDouble) {
+ list.float8().writeFloat8(reader.getShort());
+ } else {
+ list.smallInt().writeSmallInt(reader.getShort());
+ }
+ break;
+ case INT:
+ if (allTextMode) {
+ writeString(list.varChar(), String.valueOf(reader.getInt()));
+ } else if (readNumbersAsDouble) {
+ list.float8().writeFloat8(reader.getInt());
+ } else {
+ list.integer().writeInt(reader.getInt());
+ }
+ break;
+ case LONG:
+ if (allTextMode) {
+ writeString(list.varChar(), String.valueOf(reader.getLong()));
+ } else if (readNumbersAsDouble) {
+ list.float8().writeFloat8(reader.getLong());
+ } else {
+ list.bigInt().writeBigInt(reader.getLong());
+ }
+ break;
+ case FLOAT:
+ if (allTextMode) {
+ writeString(list.varChar(), String.valueOf(reader.getFloat()));
+ } else if (readNumbersAsDouble) {
+ list.float8().writeFloat8(reader.getFloat());
+ } else {
+ list.float4().writeFloat4(reader.getFloat());
+ }
+ break;
+ case DOUBLE:
+ if (allTextMode) {
+ writeString(list.varChar(), String.valueOf(reader.getDouble()));
+ } else {
+ list.float8().writeFloat8(reader.getDouble());
+ }
+ break;
+ case DECIMAL:
+ throw unsupportedError("Decimals are currently not supported.");
+ case DATE:
+ if (allTextMode) {
+ writeString(list.varChar(), reader.getDate().toString());
+ } else {
+ long milliSecondsSinceEpoch = reader.getDate().toDaysSinceEpoch() * MILLISECONDS_IN_A_DAY;
+ list.date().writeDate(milliSecondsSinceEpoch);
+ }
+ break;
+ case TIME:
+ if (allTextMode) {
+ writeString(list.varChar(), reader.getTime().toString());
+ } else {
+ OTime t = reader.getTime();
+ int h = t.getHour();
+ int m = t.getMinute();
+ int s = t.getSecond();
+ int ms = t.getMilliSecond();
+ int millisOfDay = ms + (s + ((m + (h * 60)) * 60)) * 1000;
+ list.time().writeTime(millisOfDay);
+ }
+ break;
+ case TIMESTAMP:
+ if (allTextMode) {
+ writeString(list.varChar(), reader.getTimestamp().toString());
+ } else {
+ list.timeStamp().writeTimeStamp(reader.getTimestampLong());
+ }
+ break;
+ case INTERVAL:
+ throw unsupportedError("Interval is currently not supported.");
+ case START_MAP:
+ writeToMap(reader, list.map());
+ break;
+ case END_MAP:
+ throw dataReadError("Encountered an END_MAP event inside a list.");
+ case START_ARRAY:
+ writeToList(reader, list.list());
+ break;
+ default:
+ throw unsupportedError("Unsupported type: %s encountered during the query.%s", event);
+ }
+ }
+ list.endList();
+ }
+
+ private void writeBinary(VarBinaryWriter binaryWriter, ByteBuffer buf) {
+ buffer = buffer.reallocIfNeeded(buf.remaining());
+ buffer.setBytes(0, buf, buf.position(), buf.remaining());
+ binaryWriter.writeVarBinary(0, buf.remaining(), buffer);
+ }
+
+ private void writeString(VarCharWriter varCharWriter, String string) {
+ final byte[] strBytes = Bytes.toBytes(string);
+ buffer = buffer.reallocIfNeeded(strBytes.length);
+ buffer.setBytes(0, strBytes);
+ varCharWriter.writeVarChar(0, strBytes.length, buffer);
+ }
+
+ private UserException unsupportedError(String format, Object... args) {
+ return UserException.unsupportedError()
+ .message(String.format(format, args))
+ .build(logger);
+ }
+
+ private UserException dataReadError(String format, Object... args) {
+ return UserException.dataReadError()
+ .message(String.format(format, args))
+ .build(logger);
+ }
+
+ private DBDocumentReaderBase nextDocumentReader() {
+ final OperatorStats operatorStats = operatorContext == null ? null : operatorContext.getStats();
+ try {
+ if (operatorStats != null) {
+ operatorStats.startWait();
+ }
+ try {
+ if (!documentReaderIterators.hasNext()) {
+ return null;
+ } else {
+ return (DBDocumentReaderBase) documentReaderIterators.next();
+ }
+ } finally {
+ if (operatorStats != null) {
+ operatorStats.stopWait();
+ }
+ }
+ } catch (DBException e) {
+ throw UserException.dataReadError(e).build(logger);
+ }
+ }
+
+ /*
+ * Extracts contiguous named segments from the SchemaPath, starting from the
+ * root segment and build the FieldPath from it for projection.
+ *
+ * This is due to bug 22726 and 22727, which cause DB's DocumentReaders to
+ * behave incorrectly for sparse lists, hence we avoid projecting beyond the
+ * first encountered ARRAY field and let Drill handle the projection.
+ */
+ private static FieldPath getFieldPathForProjection(SchemaPath column) {
+ Stack<PathSegment.NameSegment> pathSegments = new Stack<PathSegment.NameSegment>();
+ PathSegment seg = column.getRootSegment();
+ while (seg != null && seg.isNamed()) {
+ pathSegments.push((PathSegment.NameSegment) seg);
+ seg = seg.getChild();
+ }
+ FieldSegment.NameSegment child = null;
+ while (!pathSegments.isEmpty()) {
+ child = new FieldSegment.NameSegment(pathSegments.pop().getPath(), child, false);
+ }
+ return new FieldPath(child);
+ }
+
+ @Override
+ public void close() {
+ if (documentStream != null) {
+ documentStream.close();
+ }
+ if (table != null) {
+ table.close();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/util/CommonFns.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/util/CommonFns.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/util/CommonFns.java
new file mode 100644
index 0000000..a7b8cd1
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/util/CommonFns.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mapr.db.util;
+
+public class CommonFns {
+
+ public static boolean isNullOrEmpty(final byte[] key) {
+ return key == null || key.length == 0;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatMatcher.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatMatcher.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatMatcher.java
new file mode 100644
index 0000000..47e9927
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatMatcher.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mapr.streams;
+
+import java.io.IOException;
+
+import org.apache.drill.exec.store.mapr.TableFormatMatcher;
+import org.apache.drill.exec.store.mapr.TableFormatPlugin;
+
+import com.mapr.fs.MapRFileStatus;
+
+public class StreamsFormatMatcher extends TableFormatMatcher {
+
+ public StreamsFormatMatcher(TableFormatPlugin plugin) {
+ super(plugin);
+ }
+
+ @Override
+ protected boolean isSupportedTable(MapRFileStatus status) throws IOException {
+ return getFormatPlugin()
+ .getMaprFS()
+ .getTableProperties(status.getPath())
+ .getAttr()
+ .getIsMarlinTable();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatPlugin.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatPlugin.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatPlugin.java
new file mode 100644
index 0000000..811e245
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatPlugin.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mapr.streams;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.AbstractWriter;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.dfs.FormatMatcher;
+import org.apache.drill.exec.store.mapr.TableFormatPlugin;
+import org.apache.hadoop.conf.Configuration;
+
+public class StreamsFormatPlugin extends TableFormatPlugin {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StreamsFormatPlugin.class);
+ private StreamsFormatMatcher matcher;
+
+ public StreamsFormatPlugin(String name, DrillbitContext context, Configuration fsConf,
+ StoragePluginConfig storageConfig, StreamsFormatPluginConfig formatConfig) {
+ super(name, context, fsConf, storageConfig, formatConfig);
+ matcher = new StreamsFormatMatcher(this);
+ }
+
+ @Override
+ public boolean supportsRead() {
+ return true;
+ }
+
+ @Override
+ public boolean supportsWrite() {
+ return false;
+ }
+
+ @Override
+ public boolean supportsAutoPartitioning() {
+ return false;
+ }
+
+ @Override
+ public FormatMatcher getMatcher() {
+ return matcher;
+ }
+
+ @Override
+ public AbstractWriter getWriter(PhysicalOperator child, String location,
+ List<String> partitionColumns) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public AbstractGroupScan getGroupScan(String userName, FileSelection selection,
+ List<SchemaPath> columns) throws IOException {
+ List<String> files = selection.getFiles();
+ assert (files.size() == 1);
+ //TableProperties props = getMaprFS().getTableProperties(new Path(files.get(0)));
+ throw new UnsupportedOperationException("not implemented");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatPluginConfig.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatPluginConfig.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatPluginConfig.java
new file mode 100644
index 0000000..b061f03
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatPluginConfig.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mapr.streams;
+
+import org.apache.drill.exec.store.mapr.TableFormatPluginConfig;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonInclude.Include;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("streams") @JsonInclude(Include.NON_DEFAULT)
+public class StreamsFormatPluginConfig extends TableFormatPluginConfig {
+
+ @Override
+ public int hashCode() {
+ return 47;
+ }
+
+ @Override
+ protected boolean impEquals(Object obj) {
+ return true; // TODO: compare custom properties once added
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFormatMatcher.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFormatMatcher.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFormatMatcher.java
deleted file mode 100644
index fa084f0..0000000
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFormatMatcher.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.maprdb;
-
-import java.io.IOException;
-
-import org.apache.drill.exec.planner.logical.DrillTable;
-import org.apache.drill.exec.planner.logical.DynamicDrillTable;
-import org.apache.drill.exec.store.dfs.DrillFileSystem;
-import org.apache.drill.exec.store.dfs.FileSelection;
-import org.apache.drill.exec.store.dfs.FileSystemPlugin;
-import org.apache.drill.exec.store.dfs.FormatMatcher;
-import org.apache.drill.exec.store.dfs.FormatPlugin;
-import org.apache.drill.exec.store.dfs.FormatSelection;
-import org.apache.hadoop.fs.FileStatus;
-
-import com.mapr.fs.MapRFileStatus;
-
-public class MapRDBFormatMatcher extends FormatMatcher {
-
- private final FormatPlugin plugin;
-
- public MapRDBFormatMatcher(FormatPlugin plugin) {
- this.plugin = plugin;
- }
-
- @Override
- public boolean supportDirectoryReads() {
- return false;
- }
-
- public DrillTable isReadable(DrillFileSystem fs,
- FileSelection selection, FileSystemPlugin fsPlugin,
- String storageEngineName, String userName) throws IOException {
- FileStatus status = selection.getFirstPath(fs);
- if (!isFileReadable(fs, status)) {
- return null;
- }
-
- return new DynamicDrillTable(fsPlugin, storageEngineName, userName,
- new FormatSelection(getFormatPlugin().getConfig(), selection));
- }
-
- @Override
- public boolean isFileReadable(DrillFileSystem fs, FileStatus status) throws IOException {
- return (status instanceof MapRFileStatus) && ((MapRFileStatus) status).isTable();
- }
-
- @Override
- public FormatPlugin getFormatPlugin() {
- return plugin;
- }
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFormatPlugin.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFormatPlugin.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFormatPlugin.java
deleted file mode 100644
index 0694f5b..0000000
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFormatPlugin.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.maprdb;
-
-import static com.mapr.fs.jni.MapRConstants.MAPRFS_PREFIX;
-
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.logical.FormatPluginConfig;
-import org.apache.drill.common.logical.StoragePluginConfig;
-import org.apache.drill.exec.physical.base.AbstractGroupScan;
-import org.apache.drill.exec.physical.base.AbstractWriter;
-import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.store.StoragePluginOptimizerRule;
-import org.apache.drill.exec.store.dfs.FileSelection;
-import org.apache.drill.exec.store.dfs.FileSystemConfig;
-import org.apache.drill.exec.store.dfs.FileSystemPlugin;
-import org.apache.drill.exec.store.dfs.FormatMatcher;
-import org.apache.drill.exec.store.dfs.FormatPlugin;
-import org.apache.drill.exec.store.hbase.HBaseScanSpec;
-import org.apache.drill.exec.store.maprdb.binary.BinaryTableGroupScan;
-import org.apache.drill.exec.store.maprdb.json.JsonScanSpec;
-import org.apache.drill.exec.store.maprdb.json.JsonTableGroupScan;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.google.common.collect.ImmutableSet;
-import com.mapr.fs.MapRFileSystem;
-import com.mapr.fs.tables.TableProperties;
-
-public class MapRDBFormatPlugin implements FormatPlugin {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory
- .getLogger(MapRDBFormatPlugin.class);
-
- private final FileSystemConfig storageConfig;
- private final MapRDBFormatPluginConfig config;
- private final MapRDBFormatMatcher matcher;
- private final Configuration fsConf;
- private final DrillbitContext context;
- private final String name;
-
- private volatile FileSystemPlugin storagePlugin;
- private volatile MapRFileSystem maprfs;
-
- public MapRDBFormatPlugin(String name, DrillbitContext context, Configuration fsConf,
- StoragePluginConfig storageConfig) {
- this(name, context, fsConf, storageConfig, new MapRDBFormatPluginConfig());
- }
-
- public MapRDBFormatPlugin(String name, DrillbitContext context, Configuration fsConf,
- StoragePluginConfig storageConfig, MapRDBFormatPluginConfig formatConfig) {
- this.context = context;
- this.config = formatConfig;
- this.matcher = new MapRDBFormatMatcher(this);
- this.storageConfig = (FileSystemConfig) storageConfig;
- this.fsConf = fsConf;
- this.name = name == null ? "maprdb" : name;
- try {
- this.maprfs = new MapRFileSystem();
- maprfs.initialize(new URI(MAPRFS_PREFIX), fsConf);
- } catch (IOException | URISyntaxException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public boolean supportsRead() {
- return true;
- }
-
- @Override
- public boolean supportsWrite() {
- return false;
- }
-
- @Override
- public boolean supportsAutoPartitioning() {
- return false;
- }
-
- @Override
- public FormatMatcher getMatcher() {
- return matcher;
- }
-
- public Configuration getFsConf() {
- return fsConf;
- }
-
- @Override
- public AbstractWriter getWriter(PhysicalOperator child, String location,
- List<String> partitionColumns) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- @JsonIgnore
- public Set<StoragePluginOptimizerRule> getOptimizerRules() {
- return ImmutableSet.of(MapRDBPushFilterIntoScan.FILTER_ON_SCAN, MapRDBPushFilterIntoScan.FILTER_ON_PROJECT);
- }
-
- @Override
- public AbstractGroupScan getGroupScan(String userName, FileSelection selection,
- List<SchemaPath> columns) throws IOException {
- List<String> files = selection.getFiles();
- assert (files.size() == 1);
- String tableName = files.get(0);
- TableProperties props = maprfs.getTableProperties(new Path(tableName));
-
- if (props.getAttr().getJson()) {
- JsonScanSpec scanSpec = new JsonScanSpec(tableName, null/*condition*/);
- return new JsonTableGroupScan(userName, getStoragePlugin(), this, scanSpec, columns);
- } else {
- HBaseScanSpec scanSpec = new HBaseScanSpec(tableName);
- return new BinaryTableGroupScan(userName, getStoragePlugin(), this, scanSpec, columns);
- }
- }
-
- @Override
- public FormatPluginConfig getConfig() {
- return config;
- }
-
- @Override
- public StoragePluginConfig getStorageConfig() {
- return storageConfig;
- }
-
- @Override
- public DrillbitContext getContext() {
- return context;
- }
-
- @Override
- public String getName() {
- return name;
- }
-
- public synchronized FileSystemPlugin getStoragePlugin() {
- if (this.storagePlugin == null) {
- try {
- this.storagePlugin = (FileSystemPlugin) (context.getStorage().getPlugin(storageConfig));
- } catch (ExecutionSetupException e) {
- throw new RuntimeException(e);
- }
- }
- return storagePlugin;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFormatPluginConfig.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFormatPluginConfig.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFormatPluginConfig.java
deleted file mode 100644
index eb341d9..0000000
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFormatPluginConfig.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.maprdb;
-
-import org.apache.drill.common.logical.FormatPluginConfig;
-
-import com.fasterxml.jackson.annotation.JsonInclude;
-import com.fasterxml.jackson.annotation.JsonInclude.Include;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-
-@JsonTypeName("maprdb") @JsonInclude(Include.NON_DEFAULT)
-public class MapRDBFormatPluginConfig implements FormatPluginConfig {
-
- private boolean allTextMode = false;
- private boolean readAllNumbersAsDouble = false;
-
- @Override
- public int hashCode() {
- return 53;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
-
- if (obj == null) {
- return false;
- }
-
- if (getClass() != obj.getClass()) {
- return false;
- }
-
- MapRDBFormatPluginConfig other = (MapRDBFormatPluginConfig)obj;
-
- if (readAllNumbersAsDouble != other.readAllNumbersAsDouble) {
- return false;
- }
-
- if (allTextMode != other.allTextMode) {
- return false;
- }
-
- return true;
- }
-
- public boolean isReadAllNumbersAsDouble() {
- return readAllNumbersAsDouble;
- }
-
- public boolean isAllTextMode() {
- return allTextMode;
- }
-
- @JsonProperty("allTextMode")
- public void setAllTextMode(boolean mode) {
- allTextMode = mode;
- }
-
- @JsonProperty("readAllNumbersAsDouble")
- public void setReadAllNumbersAsDouble(boolean read) {
- readAllNumbersAsDouble = read;
- }
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBGroupScan.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBGroupScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBGroupScan.java
deleted file mode 100644
index 393bfe5..0000000
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBGroupScan.java
+++ /dev/null
@@ -1,283 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.maprdb;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.NavigableMap;
-import java.util.PriorityQueue;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.physical.EndpointAffinity;
-import org.apache.drill.exec.physical.base.AbstractGroupScan;
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.store.dfs.FileSystemConfig;
-import org.apache.drill.exec.store.dfs.FileSystemPlugin;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Stopwatch;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-public abstract class MapRDBGroupScan extends AbstractGroupScan {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBGroupScan.class);
-
- private FileSystemPlugin storagePlugin;
-
- private MapRDBFormatPlugin formatPlugin;
-
- protected MapRDBFormatPluginConfig formatPluginConfig;
-
- protected List<SchemaPath> columns;
-
- protected Map<Integer, List<MapRDBSubScanSpec>> endpointFragmentMapping;
-
- protected NavigableMap<TabletFragmentInfo, String> regionsToScan;
-
- private boolean filterPushedDown = false;
-
- private Stopwatch watch = Stopwatch.createUnstarted();
-
- private static final Comparator<List<MapRDBSubScanSpec>> LIST_SIZE_COMPARATOR = new Comparator<List<MapRDBSubScanSpec>>() {
- @Override
- public int compare(List<MapRDBSubScanSpec> list1, List<MapRDBSubScanSpec> list2) {
- return list1.size() - list2.size();
- }
- };
-
- private static final Comparator<List<MapRDBSubScanSpec>> LIST_SIZE_COMPARATOR_REV = Collections.reverseOrder(LIST_SIZE_COMPARATOR);
-
- public MapRDBGroupScan(MapRDBGroupScan that) {
- super(that);
- this.columns = that.columns;
- this.formatPlugin = that.formatPlugin;
- this.formatPluginConfig = that.formatPluginConfig;
- this.storagePlugin = that.storagePlugin;
- this.regionsToScan = that.regionsToScan;
- this.filterPushedDown = that.filterPushedDown;
- }
-
- public MapRDBGroupScan(FileSystemPlugin storagePlugin,
- MapRDBFormatPlugin formatPlugin, List<SchemaPath> columns, String userName) {
- super(userName);
- this.storagePlugin = storagePlugin;
- this.formatPlugin = formatPlugin;
- this.formatPluginConfig = (MapRDBFormatPluginConfig)formatPlugin.getConfig();
- this.columns = columns;
- }
-
- @Override
- public List<EndpointAffinity> getOperatorAffinity() {
- watch.reset();
- watch.start();
- Map<String, DrillbitEndpoint> endpointMap = new HashMap<String, DrillbitEndpoint>();
- for (DrillbitEndpoint ep : formatPlugin.getContext().getBits()) {
- endpointMap.put(ep.getAddress(), ep);
- }
-
- Map<DrillbitEndpoint, EndpointAffinity> affinityMap = new HashMap<DrillbitEndpoint, EndpointAffinity>();
- for (String serverName : regionsToScan.values()) {
- DrillbitEndpoint ep = endpointMap.get(serverName);
- if (ep != null) {
- EndpointAffinity affinity = affinityMap.get(ep);
- if (affinity == null) {
- affinityMap.put(ep, new EndpointAffinity(ep, 1));
- } else {
- affinity.addAffinity(1);
- }
- }
- }
- logger.debug("Took {} �s to get operator affinity", watch.elapsed(TimeUnit.NANOSECONDS)/1000);
- return Lists.newArrayList(affinityMap.values());
- }
-
- /**
- *
- * @param incomingEndpoints
- */
- @Override
- public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) {
- watch.reset();
- watch.start();
-
- final int numSlots = incomingEndpoints.size();
- Preconditions.checkArgument(numSlots <= regionsToScan.size(),
- String.format("Incoming endpoints %d is greater than number of scan regions %d", numSlots, regionsToScan.size()));
-
- /*
- * Minimum/Maximum number of assignment per slot
- */
- final int minPerEndpointSlot = (int) Math.floor((double)regionsToScan.size() / numSlots);
- final int maxPerEndpointSlot = (int) Math.ceil((double)regionsToScan.size() / numSlots);
-
- /*
- * initialize (endpoint index => HBaseSubScanSpec list) map
- */
- endpointFragmentMapping = Maps.newHashMapWithExpectedSize(numSlots);
-
- /*
- * another map with endpoint (hostname => corresponding index list) in 'incomingEndpoints' list
- */
- Map<String, Queue<Integer>> endpointHostIndexListMap = Maps.newHashMap();
-
- /*
- * Initialize these two maps
- */
- for (int i = 0; i < numSlots; ++i) {
- endpointFragmentMapping.put(i, new ArrayList<MapRDBSubScanSpec>(maxPerEndpointSlot));
- String hostname = incomingEndpoints.get(i).getAddress();
- Queue<Integer> hostIndexQueue = endpointHostIndexListMap.get(hostname);
- if (hostIndexQueue == null) {
- hostIndexQueue = Lists.newLinkedList();
- endpointHostIndexListMap.put(hostname, hostIndexQueue);
- }
- hostIndexQueue.add(i);
- }
-
- Set<Entry<TabletFragmentInfo, String>> regionsToAssignSet = Sets.newHashSet(regionsToScan.entrySet());
-
- /*
- * First, we assign regions which are hosted on region servers running on drillbit endpoints
- */
- for (Iterator<Entry<TabletFragmentInfo, String>> regionsIterator = regionsToAssignSet.iterator(); regionsIterator.hasNext(); /*nothing*/) {
- Entry<TabletFragmentInfo, String> regionEntry = regionsIterator.next();
- /*
- * Test if there is a drillbit endpoint which is also an HBase RegionServer that hosts the current HBase region
- */
- Queue<Integer> endpointIndexlist = endpointHostIndexListMap.get(regionEntry.getValue());
- if (endpointIndexlist != null) {
- Integer slotIndex = endpointIndexlist.poll();
- List<MapRDBSubScanSpec> endpointSlotScanList = endpointFragmentMapping.get(slotIndex);
- endpointSlotScanList.add(getSubScanSpec(regionEntry.getKey()));
- // add to the tail of the slot list, to add more later in round robin fashion
- endpointIndexlist.offer(slotIndex);
- // this region has been assigned
- regionsIterator.remove();
- }
- }
-
- /*
- * Build priority queues of slots, with ones which has tasks lesser than 'minPerEndpointSlot' and another which have more.
- */
- PriorityQueue<List<MapRDBSubScanSpec>> minHeap = new PriorityQueue<List<MapRDBSubScanSpec>>(numSlots, LIST_SIZE_COMPARATOR);
- PriorityQueue<List<MapRDBSubScanSpec>> maxHeap = new PriorityQueue<List<MapRDBSubScanSpec>>(numSlots, LIST_SIZE_COMPARATOR_REV);
- for(List<MapRDBSubScanSpec> listOfScan : endpointFragmentMapping.values()) {
- if (listOfScan.size() < minPerEndpointSlot) {
- minHeap.offer(listOfScan);
- } else if (listOfScan.size() > minPerEndpointSlot){
- maxHeap.offer(listOfScan);
- }
- }
-
- /*
- * Now, let's process any regions which remain unassigned and assign them to slots with minimum number of assignments.
- */
- if (regionsToAssignSet.size() > 0) {
- for (Entry<TabletFragmentInfo, String> regionEntry : regionsToAssignSet) {
- List<MapRDBSubScanSpec> smallestList = minHeap.poll();
- smallestList.add(getSubScanSpec(regionEntry.getKey()));
- if (smallestList.size() < maxPerEndpointSlot) {
- minHeap.offer(smallestList);
- }
- }
- }
-
- /*
- * While there are slots with lesser than 'minPerEndpointSlot' unit work, balance from those with more.
- */
- while(minHeap.peek() != null && minHeap.peek().size() < minPerEndpointSlot) {
- List<MapRDBSubScanSpec> smallestList = (List<MapRDBSubScanSpec>) minHeap.poll();
- List<MapRDBSubScanSpec> largestList = (List<MapRDBSubScanSpec>) maxHeap.poll();
- smallestList.add(largestList.remove(largestList.size()-1));
- if (largestList.size() > minPerEndpointSlot) {
- maxHeap.offer(largestList);
- }
- if (smallestList.size() < minPerEndpointSlot) {
- minHeap.offer(smallestList);
- }
- }
-
- /* no slot should be empty at this point */
- assert (minHeap.peek() == null || minHeap.peek().size() > 0) : String.format(
- "Unable to assign tasks to some endpoints.\nEndpoints: {}.\nAssignment Map: {}.",
- incomingEndpoints, endpointFragmentMapping.toString());
-
- logger.debug("Built assignment map in {} �s.\nEndpoints: {}.\nAssignment Map: {}",
- watch.elapsed(TimeUnit.NANOSECONDS)/1000, incomingEndpoints, endpointFragmentMapping.toString());
- }
-
- @Override
- public int getMaxParallelizationWidth() {
- return regionsToScan.size();
- }
-
- @JsonIgnore
- public MapRDBFormatPlugin getFormatPlugin() {
- return formatPlugin;
- }
-
- @Override
- public String getDigest() {
- return toString();
- }
-
- @JsonProperty("storage")
- public FileSystemConfig getStorageConfig() {
- return (FileSystemConfig) storagePlugin.getConfig();
- }
-
- @JsonIgnore
- public FileSystemPlugin getStoragePlugin(){
- return storagePlugin;
- }
-
- @JsonProperty
- public List<SchemaPath> getColumns() {
- return columns;
- }
-
- @JsonIgnore
- public boolean canPushdownProjects(List<SchemaPath> columns) {
- return true;
- }
-
- @JsonIgnore
- public void setFilterPushedDown(boolean b) {
- this.filterPushedDown = true;
- }
-
- @JsonIgnore
- public boolean isFilterPushedDown() {
- return filterPushedDown;
- }
-
- protected abstract MapRDBSubScanSpec getSubScanSpec(TabletFragmentInfo key);
-
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBPushFilterIntoScan.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBPushFilterIntoScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBPushFilterIntoScan.java
deleted file mode 100644
index c0a33bf..0000000
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBPushFilterIntoScan.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.maprdb;
-
-import org.apache.calcite.plan.RelOptRuleCall;
-import org.apache.calcite.plan.RelOptRuleOperand;
-import org.apache.calcite.plan.RelOptUtil;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rex.RexNode;
-import org.apache.drill.common.expression.LogicalExpression;
-import org.apache.drill.exec.planner.logical.DrillOptiq;
-import org.apache.drill.exec.planner.logical.DrillParseContext;
-import org.apache.drill.exec.planner.logical.RelOptHelper;
-import org.apache.drill.exec.planner.physical.FilterPrel;
-import org.apache.drill.exec.planner.physical.PrelUtil;
-import org.apache.drill.exec.planner.physical.ProjectPrel;
-import org.apache.drill.exec.planner.physical.ScanPrel;
-import org.apache.drill.exec.store.StoragePluginOptimizerRule;
-import org.apache.drill.exec.store.hbase.HBaseScanSpec;
-import org.apache.drill.exec.store.maprdb.binary.BinaryTableGroupScan;
-import org.apache.drill.exec.store.maprdb.binary.MapRDBFilterBuilder;
-import org.apache.drill.exec.store.maprdb.json.JsonConditionBuilder;
-import org.apache.drill.exec.store.maprdb.json.JsonScanSpec;
-import org.apache.drill.exec.store.maprdb.json.JsonTableGroupScan;
-
-import com.google.common.collect.ImmutableList;
-
-public abstract class MapRDBPushFilterIntoScan extends StoragePluginOptimizerRule {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBPushFilterIntoScan.class);
-
- private MapRDBPushFilterIntoScan(RelOptRuleOperand operand, String description) {
- super(operand, description);
- }
-
- public static final StoragePluginOptimizerRule FILTER_ON_SCAN = new MapRDBPushFilterIntoScan(RelOptHelper.some(FilterPrel.class, RelOptHelper.any(ScanPrel.class)), "MapRDBPushFilterIntoScan:Filter_On_Scan") {
-
- @Override
- public void onMatch(RelOptRuleCall call) {
- final ScanPrel scan = (ScanPrel) call.rel(1);
- final FilterPrel filter = (FilterPrel) call.rel(0);
- final RexNode condition = filter.getCondition();
-
- if (scan.getGroupScan() instanceof BinaryTableGroupScan) {
- BinaryTableGroupScan groupScan = (BinaryTableGroupScan)scan.getGroupScan();
- doPushFilterIntoBinaryGroupScan(call, filter, null, scan, groupScan, condition);
- } else {
- assert(scan.getGroupScan() instanceof JsonTableGroupScan);
- JsonTableGroupScan groupScan = (JsonTableGroupScan)scan.getGroupScan();
- doPushFilterIntoJsonGroupScan(call, filter, null, scan, groupScan, condition);
- }
- }
-
- @Override
- public boolean matches(RelOptRuleCall call) {
- final ScanPrel scan = (ScanPrel) call.rel(1);
- if (scan.getGroupScan() instanceof BinaryTableGroupScan ||
- scan.getGroupScan() instanceof JsonTableGroupScan) {
- return super.matches(call);
- }
- return false;
- }
- };
-
- public static final StoragePluginOptimizerRule FILTER_ON_PROJECT = new MapRDBPushFilterIntoScan(RelOptHelper.some(FilterPrel.class, RelOptHelper.some(ProjectPrel.class, RelOptHelper.any(ScanPrel.class))), "MapRDBPushFilterIntoScan:Filter_On_Project") {
-
- @Override
- public void onMatch(RelOptRuleCall call) {
- final ScanPrel scan = (ScanPrel) call.rel(2);
- final ProjectPrel project = (ProjectPrel) call.rel(1);
- final FilterPrel filter = (FilterPrel) call.rel(0);
-
- // convert the filter to one that references the child of the project
- final RexNode condition = RelOptUtil.pushFilterPastProject(filter.getCondition(), project);
-
- if (scan.getGroupScan() instanceof BinaryTableGroupScan) {
- BinaryTableGroupScan groupScan = (BinaryTableGroupScan)scan.getGroupScan();
- doPushFilterIntoBinaryGroupScan(call, filter, project, scan, groupScan, condition);
- } else {
- assert(scan.getGroupScan() instanceof JsonTableGroupScan);
- JsonTableGroupScan groupScan = (JsonTableGroupScan)scan.getGroupScan();
- doPushFilterIntoJsonGroupScan(call, filter, project, scan, groupScan, condition);
- }
- }
-
- @Override
- public boolean matches(RelOptRuleCall call) {
- final ScanPrel scan = (ScanPrel) call.rel(2);
- if (scan.getGroupScan() instanceof BinaryTableGroupScan ||
- scan.getGroupScan() instanceof JsonTableGroupScan) {
- return super.matches(call);
- }
- return false;
- }
- };
-
- protected void doPushFilterIntoJsonGroupScan(RelOptRuleCall call,
- FilterPrel filter, final ProjectPrel project, ScanPrel scan,
- JsonTableGroupScan groupScan, RexNode condition) {
-
- if (groupScan.isFilterPushedDown()) {
- /*
- * The rule can get triggered again due to the transformed "scan => filter" sequence
- * created by the earlier execution of this rule when we could not do a complete
- * conversion of Optiq Filter's condition to HBase Filter. In such cases, we rely upon
- * this flag to not do a re-processing of the rule on the already transformed call.
- */
- return;
- }
-
- LogicalExpression conditionExp = null;
- try {
- conditionExp = DrillOptiq.toDrill(new DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan, condition);
- } catch (ClassCastException e) {
- // MD-771 bug in DrillOptiq.toDrill() causes filter condition on ITEM operator to throw ClassCastException
- // For such cases, we return without pushdown
- return;
- }
- final JsonConditionBuilder jsonConditionBuilder = new JsonConditionBuilder(groupScan, conditionExp);
- final JsonScanSpec newScanSpec = jsonConditionBuilder.parseTree();
- if (newScanSpec == null) {
- return; //no filter pushdown ==> No transformation.
- }
-
- final JsonTableGroupScan newGroupsScan = new JsonTableGroupScan(groupScan.getUserName(),
- groupScan.getStoragePlugin(),
- groupScan.getFormatPlugin(),
- newScanSpec,
- groupScan.getColumns());
- newGroupsScan.setFilterPushedDown(true);
-
- final ScanPrel newScanPrel = ScanPrel.create(scan, filter.getTraitSet(), newGroupsScan, scan.getRowType());
-
- // Depending on whether is a project in the middle, assign either scan or copy of project to childRel.
- final RelNode childRel = project == null ? newScanPrel : project.copy(project.getTraitSet(), ImmutableList.of((RelNode)newScanPrel));;
-
- if (jsonConditionBuilder.isAllExpressionsConverted()) {
- /*
- * Since we could convert the entire filter condition expression into an HBase filter,
- * we can eliminate the filter operator altogether.
- */
- call.transformTo(childRel);
- } else {
- call.transformTo(filter.copy(filter.getTraitSet(), ImmutableList.of(childRel)));
- }
- }
-
- protected void doPushFilterIntoBinaryGroupScan(final RelOptRuleCall call,
- final FilterPrel filter,
- final ProjectPrel project,
- final ScanPrel scan,
- final BinaryTableGroupScan groupScan,
- final RexNode condition) {
-
- if (groupScan.isFilterPushedDown()) {
- /*
- * The rule can get triggered again due to the transformed "scan => filter" sequence
- * created by the earlier execution of this rule when we could not do a complete
- * conversion of Optiq Filter's condition to HBase Filter. In such cases, we rely upon
- * this flag to not do a re-processing of the rule on the already transformed call.
- */
- return;
- }
-
- final LogicalExpression conditionExp = DrillOptiq.toDrill(new DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan, condition);
- final MapRDBFilterBuilder maprdbFilterBuilder = new MapRDBFilterBuilder(groupScan, conditionExp);
- final HBaseScanSpec newScanSpec = maprdbFilterBuilder.parseTree();
- if (newScanSpec == null) {
- return; //no filter pushdown ==> No transformation.
- }
-
- final BinaryTableGroupScan newGroupsScan = new BinaryTableGroupScan(groupScan.getUserName(), groupScan.getStoragePlugin(),
- groupScan.getFormatPlugin(), newScanSpec, groupScan.getColumns());
- newGroupsScan.setFilterPushedDown(true);
-
- final ScanPrel newScanPrel = ScanPrel.create(scan, filter.getTraitSet(), newGroupsScan, scan.getRowType());
-
- // Depending on whether is a project in the middle, assign either scan or copy of project to childRel.
- final RelNode childRel = project == null ? newScanPrel : project.copy(project.getTraitSet(), ImmutableList.of((RelNode)newScanPrel));;
-
- if (maprdbFilterBuilder.isAllExpressionsConverted()) {
- /*
- * Since we could convert the entire filter condition expression into an HBase filter,
- * we can eliminate the filter operator altogether.
- */
- call.transformTo(childRel);
- } else {
- call.transformTo(filter.copy(filter.getTraitSet(), ImmutableList.of(childRel)));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBScanBatchCreator.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBScanBatchCreator.java
deleted file mode 100644
index 1cd33ca..0000000
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBScanBatchCreator.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.maprdb;
-
-import java.util.List;
-
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.impl.BatchCreator;
-import org.apache.drill.exec.physical.impl.ScanBatch;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.store.RecordReader;
-import org.apache.drill.exec.store.hbase.HBaseRecordReader;
-import org.apache.drill.exec.store.hbase.HBaseSubScan.HBaseSubScanSpec;
-import org.apache.drill.exec.store.maprdb.binary.BinaryTableGroupScan;
-import org.apache.drill.exec.store.maprdb.json.MaprDBJsonRecordReader;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-public class MapRDBScanBatchCreator implements BatchCreator<MapRDBSubScan>{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBScanBatchCreator.class);
-
- @Override
- public ScanBatch getBatch(FragmentContext context, MapRDBSubScan subScan, List<RecordBatch> children) throws ExecutionSetupException {
- Preconditions.checkArgument(children.isEmpty());
- List<RecordReader> readers = Lists.newArrayList();
- Configuration conf = HBaseConfiguration.create();
- for(MapRDBSubScanSpec scanSpec : subScan.getRegionScanSpecList()){
- try {
- if (BinaryTableGroupScan.TABLE_BINARY.equals(subScan.getTableType())) {
- readers.add(new HBaseRecordReader(conf, getHBaseSubScanSpec(scanSpec), subScan.getColumns(), context));
- } else {
- readers.add(new MaprDBJsonRecordReader(scanSpec, subScan.getFormatPluginConfig(), subScan.getColumns(), context));
- }
- } catch (Exception e1) {
- throw new ExecutionSetupException(e1);
- }
- }
- return new ScanBatch(subScan, context, readers.iterator());
- }
-
- private HBaseSubScanSpec getHBaseSubScanSpec(MapRDBSubScanSpec scanSpec) {
- return new HBaseSubScanSpec(scanSpec.getTableName(), scanSpec.getRegionServer(),
- scanSpec.getStartRow(), scanSpec.getStopRow(), scanSpec.getSerializedFilter(), null);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBSubScan.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBSubScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBSubScan.java
deleted file mode 100644
index 7ea4cbf..0000000
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBSubScan.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.maprdb;
-
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.logical.StoragePluginConfig;
-import org.apache.drill.exec.physical.base.AbstractBase;
-import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.base.PhysicalVisitor;
-import org.apache.drill.exec.physical.base.SubScan;
-import org.apache.drill.exec.store.StoragePluginRegistry;
-import org.apache.drill.exec.store.dfs.FileSystemPlugin;
-
-import com.fasterxml.jackson.annotation.JacksonInject;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterators;
-
-// Class containing information for reading a single HBase region
-@JsonTypeName("maprdb-sub-scan")
-public class MapRDBSubScan extends AbstractBase implements SubScan {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBSubScan.class);
-
- @JsonProperty
- public final StoragePluginConfig storage;
- @JsonIgnore
- private final MapRDBFormatPluginConfig fsFormatPluginConfig;
- private final FileSystemPlugin fsStoragePlugin;
- private final List<MapRDBSubScanSpec> regionScanSpecList;
- private final List<SchemaPath> columns;
- private final String tableType;
-
- @JsonCreator
- public MapRDBSubScan(@JacksonInject StoragePluginRegistry registry,
- @JsonProperty("userName") String userName,
- @JsonProperty("formatPluginConfig") MapRDBFormatPluginConfig formatPluginConfig,
- @JsonProperty("storage") StoragePluginConfig storage,
- @JsonProperty("regionScanSpecList") List<MapRDBSubScanSpec> regionScanSpecList,
- @JsonProperty("columns") List<SchemaPath> columns,
- @JsonProperty("tableType") String tableType) throws ExecutionSetupException {
- super(userName);
- this.fsFormatPluginConfig = formatPluginConfig;
- this.fsStoragePlugin = (FileSystemPlugin) registry.getPlugin(storage);
- this.regionScanSpecList = regionScanSpecList;
- this.storage = storage;
- this.columns = columns;
- this.tableType = tableType;
- }
-
- public MapRDBSubScan(String userName, MapRDBFormatPluginConfig formatPluginConfig, FileSystemPlugin storagePlugin, StoragePluginConfig config,
- List<MapRDBSubScanSpec> maprSubScanSpecs, List<SchemaPath> columns, String tableType) {
- super(userName);
- fsFormatPluginConfig = formatPluginConfig;
- fsStoragePlugin = storagePlugin;
- storage = config;
- this.regionScanSpecList = maprSubScanSpecs;
- this.columns = columns;
- this.tableType = tableType;
- }
-
- public List<MapRDBSubScanSpec> getRegionScanSpecList() {
- return regionScanSpecList;
- }
-
- public List<SchemaPath> getColumns() {
- return columns;
- }
-
- @Override
- public boolean isExecutable() {
- return false;
- }
-
- @Override
- public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
- return physicalVisitor.visitSubScan(this, value);
- }
-
- @Override
- public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
- Preconditions.checkArgument(children.isEmpty());
- return new MapRDBSubScan(getUserName(), fsFormatPluginConfig, fsStoragePlugin, storage, regionScanSpecList, columns, tableType);
- }
-
- @Override
- public Iterator<PhysicalOperator> iterator() {
- return Iterators.emptyIterator();
- }
-
- @Override
- public int getOperatorType() {
- return 1001;
- }
-
- public String getTableType() {
- return tableType;
- }
-
- public MapRDBFormatPluginConfig getFormatPluginConfig() {
- return fsFormatPluginConfig;
- }
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBSubScanSpec.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBSubScanSpec.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBSubScanSpec.java
deleted file mode 100644
index cc8bc5d..0000000
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBSubScanSpec.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.maprdb;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.mapr.fs.jni.MapRConstants;
-import com.mapr.org.apache.hadoop.hbase.util.Bytes;
-
-public class MapRDBSubScanSpec {
-
- protected String tableName;
- protected String regionServer;
- protected byte[] startRow;
- protected byte[] stopRow;
- protected byte[] serializedFilter;
-
- @JsonCreator
- public MapRDBSubScanSpec(@JsonProperty("tableName") String tableName,
- @JsonProperty("regionServer") String regionServer,
- @JsonProperty("startRow") byte[] startRow,
- @JsonProperty("stopRow") byte[] stopRow,
- @JsonProperty("serializedFilter") byte[] serializedFilter,
- @JsonProperty("filterString") String filterString) {
- if (serializedFilter != null && filterString != null) {
- throw new IllegalArgumentException("The parameters 'serializedFilter' or 'filterString' cannot be specified at the same time.");
- }
- this.tableName = tableName;
- this.regionServer = regionServer;
- this.startRow = startRow;
- this.stopRow = stopRow;
- this.serializedFilter = serializedFilter;
- }
-
- /* package */ MapRDBSubScanSpec() {
- // empty constructor, to be used with builder pattern;
- }
-
- public String getTableName() {
- return tableName;
- }
-
- public MapRDBSubScanSpec setTableName(String tableName) {
- this.tableName = tableName;
- return this;
- }
-
- public String getRegionServer() {
- return regionServer;
- }
-
- public MapRDBSubScanSpec setRegionServer(String regionServer) {
- this.regionServer = regionServer;
- return this;
- }
-
- /**
- * @return the raw (not-encoded) start row key for this sub-scan
- */
- public byte[] getStartRow() {
- return startRow == null ? MapRConstants.EMPTY_BYTE_ARRAY: startRow;
- }
-
- public MapRDBSubScanSpec setStartRow(byte[] startRow) {
- this.startRow = startRow;
- return this;
- }
-
- /**
- * @return the raw (not-encoded) stop row key for this sub-scan
- */
- public byte[] getStopRow() {
- return stopRow == null ? MapRConstants.EMPTY_BYTE_ARRAY : stopRow;
- }
-
- public MapRDBSubScanSpec setStopRow(byte[] stopRow) {
- this.stopRow = stopRow;
- return this;
- }
-
- public byte[] getSerializedFilter() {
- return serializedFilter;
- }
-
- public MapRDBSubScanSpec setSerializedFilter(byte[] serializedFilter) {
- this.serializedFilter = serializedFilter;
- return this;
- }
-
- @Override
- public String toString() {
- return "MapRDBSubScanSpec [tableName=" + tableName
- + ", startRow=" + (startRow == null ? null : Bytes.toStringBinary(startRow))
- + ", stopRow=" + (stopRow == null ? null : Bytes.toStringBinary(stopRow))
- + ", filter=" + (getSerializedFilter() == null ? null : Bytes.toBase64(getSerializedFilter()))
- + ", regionServer=" + regionServer + "]";
- }
-
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBTableStats.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBTableStats.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBTableStats.java
deleted file mode 100644
index d2b1453..0000000
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBTableStats.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.maprdb;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.mapr.TableMappingRulesFactory;
-
-import com.mapr.fs.hbase.HBaseAdminImpl;
-
-public class MapRDBTableStats {
- private static volatile HBaseAdminImpl admin = null;
-
- private long numRows;
-
- public MapRDBTableStats(Configuration conf, String tablePath) throws Exception {
- if (admin == null) {
- synchronized (MapRDBTableStats.class) {
- if (admin == null) {
- Configuration config = conf;
- admin = new HBaseAdminImpl(config, TableMappingRulesFactory.create(conf));
- }
- }
- }
- numRows = admin.getNumRows(tablePath);
- }
-
- public long getNumRows() {
- return numRows;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/TabletFragmentInfo.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/TabletFragmentInfo.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/TabletFragmentInfo.java
deleted file mode 100644
index 389f00d..0000000
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/TabletFragmentInfo.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.maprdb;
-
-import org.apache.hadoop.hbase.HRegionInfo;
-
-import com.mapr.db.impl.TabletInfoImpl;
-
-public class TabletFragmentInfo implements Comparable<TabletFragmentInfo> {
-
- final private HRegionInfo regionInfo;
- final private TabletInfoImpl tabletInfoImpl;
-
- public TabletFragmentInfo(HRegionInfo regionInfo) {
- this(null, regionInfo);
- }
-
- public TabletFragmentInfo(TabletInfoImpl tabletInfoImpl) {
- this(tabletInfoImpl, null);
- }
-
- TabletFragmentInfo(TabletInfoImpl tabletInfoImpl, HRegionInfo regionInfo) {
- this.regionInfo = regionInfo;
- this.tabletInfoImpl = tabletInfoImpl;
- }
-
- public HRegionInfo getRegionInfo() {
- return regionInfo;
- }
-
- public TabletInfoImpl getTabletInfoImpl() {
- return tabletInfoImpl;
- }
-
- public boolean containsRow(byte[] row) {
- return tabletInfoImpl != null ? tabletInfoImpl.containsRow(row) :
- regionInfo.containsRow(row);
- }
-
- public byte[] getStartKey() {
- return tabletInfoImpl != null ? tabletInfoImpl.getStartRow() :
- regionInfo.getStartKey();
- }
-
- public byte[] getEndKey() {
- return tabletInfoImpl != null ? tabletInfoImpl.getStopRow() :
- regionInfo.getEndKey();
- }
-
- @Override
- public int compareTo(TabletFragmentInfo o) {
- return tabletInfoImpl != null ? tabletInfoImpl.compareTo(o.tabletInfoImpl) :
- regionInfo.compareTo(o.regionInfo);
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((regionInfo == null) ? 0 : regionInfo.hashCode());
- result = prime * result + ((tabletInfoImpl == null) ? 0 : tabletInfoImpl.hashCode());
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- TabletFragmentInfo other = (TabletFragmentInfo) obj;
- if (regionInfo == null) {
- if (other.regionInfo != null)
- return false;
- } else if (!regionInfo.equals(other.regionInfo))
- return false;
- if (tabletInfoImpl == null) {
- if (other.tabletInfoImpl != null)
- return false;
- } else if (!tabletInfoImpl.equals(other.tabletInfoImpl))
- return false;
- return true;
- }
-
- @Override
- public String toString() {
- return "TabletFragmentInfo [regionInfo=" + regionInfo + ", tabletInfoImpl=" + tabletInfoImpl
- + "]";
- }
-
-}