You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2016/01/11 08:54:00 UTC
[12/12] drill git commit: DRILL-4241: Add Single Tablet Writer
DRILL-4241: Add Single Tablet Writer
- Also move to a test bootstrap
- Update to the latest kudu and Drill
- Add plugin to Drill distribution
- Checkstyle and directory cleanup
This closes #314.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/392d1f7e
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/392d1f7e
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/392d1f7e
Branch: refs/heads/master
Commit: 392d1f7e9398fb1bee8f67b0d49a82436c3145fb
Parents: 3694909
Author: Jacques Nadeau <ja...@apache.org>
Authored: Thu Nov 19 18:20:03 2015 -0800
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sun Jan 10 23:46:44 2016 -0800
----------------------------------------------------------------------
contrib/pom.xml | 1 +
contrib/storage-kudu/pom.xml | 114 ++++++++++--
.../storage-kudu/src/main/codegen/config.fmpp | 23 +++
.../codegen/templates/KuduRecordWriter.java | 175 +++++++++++++++++++
.../drill/exec/store/kudu/KuduRecordReader.java | 119 ++++++-------
.../exec/store/kudu/KuduRecordWriterImpl.java | 174 ++++++++++++++++++
.../exec/store/kudu/KuduSchemaFactory.java | 21 +++
.../exec/store/kudu/KuduStoragePlugin.java | 13 +-
.../drill/exec/store/kudu/KuduWriter.java | 79 +++++++++
.../exec/store/kudu/KuduWriterBatchCreator.java | 43 +++++
.../resources/bootstrap-storage-plugins.json | 4 +-
.../src/main/resources/checkstyle-config.xml | 42 -----
.../main/resources/checkstyle-suppressions.xml | 19 --
.../drill/store/kudu/TestKuduConnect.java | 9 +-
.../apache/drill/store/kudu/TestKuduPlugin.java | 10 ++
.../resources/bootstrap-storage-plugins.json | 9 +
distribution/pom.xml | 5 +
distribution/src/assemble/bin.xml | 1 +
18 files changed, 706 insertions(+), 155 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/392d1f7e/contrib/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/pom.xml b/contrib/pom.xml
index bc95910..87b84c1 100644
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -36,6 +36,7 @@
<module>storage-hive</module>
<module>storage-mongo</module>
<module>storage-jdbc</module>
+ <module>storage-kudu</module>
<module>sqlline</module>
<module>data</module>
<module>gis</module>
http://git-wip-us.apache.org/repos/asf/drill/blob/392d1f7e/contrib/storage-kudu/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/storage-kudu/pom.xml b/contrib/storage-kudu/pom.xml
index 7e57ca8..cca319f 100644
--- a/contrib/storage-kudu/pom.xml
+++ b/contrib/storage-kudu/pom.xml
@@ -16,23 +16,19 @@
<parent>
<artifactId>drill-contrib-parent</artifactId>
<groupId>org.apache.drill.contrib</groupId>
- <version>1.3.0</version>
+ <version>1.5.0-SNAPSHOT</version>
</parent>
- <artifactId>drill-storage-kudu</artifactId>
- <version>1.3.0-SNAPSHOT</version>
+ <artifactId>drill-kudu-storage</artifactId>
<name>contrib/kudu-storage-plugin</name>
- <properties>
- <drill.version>1.3.0</drill.version>
- </properties>
<dependencies>
<dependency>
<groupId>org.apache.drill.exec</groupId>
<artifactId>drill-java-exec</artifactId>
- <version>${drill.version}</version>
+ <version>${project.version}</version>
</dependency>
<!-- Test dependencies -->
@@ -40,7 +36,7 @@
<groupId>org.apache.drill.exec</groupId>
<artifactId>drill-java-exec</artifactId>
<classifier>tests</classifier>
- <version>${drill.version}</version>
+ <version>${project.version}</version>
<scope>test</scope>
</dependency>
@@ -48,27 +44,20 @@
<groupId>org.apache.drill</groupId>
<artifactId>drill-common</artifactId>
<classifier>tests</classifier>
- <version>${drill.version}</version>
+ <version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.kududb</groupId>
<artifactId>kudu-client</artifactId>
- <version>0.5.0</version>
+ <version>0.6.0</version>
</dependency>
</dependencies>
<repositories>
<repository>
- <id>drill-1016</id>
- <url>https://repository.apache.org/content/repositories/orgapachedrill-1016/</url>
- <snapshots>
- <enabled>false</enabled>
- </snapshots>
- </repository>
- <repository>
<id>cdh.repo</id>
<name>Cloudera Repositories</name>
<url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
@@ -78,9 +67,98 @@
</repository>
</repositories>
+ <pluginRepositories>
+ <pluginRepository>
+ <id>apache-snapshots</id>
+ <url>https://repository.apache.org/content/groups/snapshots/</url>
+ <snapshots>
+ <enabled>true</enabled>
+ </snapshots>
+ <releases>
+ <enabled>false</enabled>
+ </releases>
+
+ </pluginRepository>
+ </pluginRepositories>
<build>
<plugins>
-
+ <plugin>
+ <artifactId>maven-resources-plugin</artifactId>
+ <executions>
+ <execution> <!-- copy all templates/data in the same location to compile them at once -->
+ <id>copy-fmpp-resources</id>
+ <phase>initialize</phase>
+ <goals>
+ <goal>copy-resources</goal>
+ </goals>
+ <configuration>
+ <outputDirectory>${project.build.directory}/codegen</outputDirectory>
+ <resources>
+ <resource>
+ <directory>src/main/codegen</directory>
+ <filtering>false</filtering>
+ </resource>
+ </resources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <version>2.8</version>
+ <executions>
+ <!-- Extract ValueVectorTypes.tdd from drill-vector.jar and put
+ it under ${project.build.directory}/codegen/data where all freemarker data
+ files are. -->
+ <execution>
+ <id>unpack-vector-types</id>
+ <phase>initialize</phase>
+ <goals>
+ <goal>unpack</goal>
+ </goals>
+ <configuration>
+ <artifactItems>
+ <artifactItem>
+ <groupId>org.apache.drill.exec</groupId>
+ <artifactId>vector</artifactId>
+ <version>${project.version}</version>
+ <type>jar</type>
+ <overWrite>true</overWrite>
+ <outputDirectory>${project.build.directory}/</outputDirectory>
+ <includes>codegen/data/ValueVectorTypes.tdd</includes>
+ </artifactItem>
+ </artifactItems>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin> <!-- generate sources from fmpp -->
+ <groupId>org.apache.drill.tools</groupId>
+ <artifactId>drill-fmpp-maven-plugin</artifactId>
+ <version>${project.version}</version>
+ <dependencies>
+ <dependency>
+ <groupId>org.freemarker</groupId>
+ <artifactId>freemarker</artifactId>
+ <version>2.3.19</version>
+ </dependency>
+ </dependencies>
+ <executions>
+ <execution>
+ <id>generate-fmpp</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>generate</goal>
+ </goals>
+ <configuration>
+ <config>${project.build.directory}/codegen/config.fmpp</config>
+ <output>${project.build.directory}/generated-sources</output>
+ <templates>${project.build.directory}/codegen/templates</templates>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
http://git-wip-us.apache.org/repos/asf/drill/blob/392d1f7e/contrib/storage-kudu/src/main/codegen/config.fmpp
----------------------------------------------------------------------
diff --git a/contrib/storage-kudu/src/main/codegen/config.fmpp b/contrib/storage-kudu/src/main/codegen/config.fmpp
new file mode 100644
index 0000000..40a29b4
--- /dev/null
+++ b/contrib/storage-kudu/src/main/codegen/config.fmpp
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http:# www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+data: {
+ vv: tdd(../data/ValueVectorTypes.tdd),
+
+}
+freemarkerLinks: {
+ includes: includes/
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/392d1f7e/contrib/storage-kudu/src/main/codegen/templates/KuduRecordWriter.java
----------------------------------------------------------------------
diff --git a/contrib/storage-kudu/src/main/codegen/templates/KuduRecordWriter.java b/contrib/storage-kudu/src/main/codegen/templates/KuduRecordWriter.java
new file mode 100644
index 0000000..01c7c28
--- /dev/null
+++ b/contrib/storage-kudu/src/main/codegen/templates/KuduRecordWriter.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+<@pp.dropOutputFile />
+<@pp.changeOutputFile name="org/apache/drill/exec/store/kudu/KuduRecordWriter.java" />
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.kudu;
+
+import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import org.apache.drill.exec.vector.complex.fn.JsonOutput;
+import java.io.IOException;
+import java.lang.UnsupportedOperationException;
+import java.util.List;
+import com.google.common.collect.Lists;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.expr.holders.*;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
+import org.apache.drill.exec.vector.*;
+import org.apache.drill.exec.util.DecimalUtility;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import org.apache.parquet.io.api.RecordConsumer;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.io.api.Binary;
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.common.types.TypeProtos;
+import org.joda.time.DateTimeUtils;
+import java.io.IOException;
+import java.lang.UnsupportedOperationException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import com.google.common.collect.Lists;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.expr.holders.*;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
+import org.apache.drill.exec.vector.*;
+import org.apache.drill.exec.util.DecimalUtility;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import org.apache.parquet.io.api.RecordConsumer;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.io.api.Binary;
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.common.types.TypeProtos;
+import org.joda.time.DateTimeUtils;
+import java.io.IOException;
+import java.lang.UnsupportedOperationException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.kududb.client.*;
+import org.apache.drill.exec.store.*;
+
+public abstract class KuduRecordWriter extends AbstractRecordWriter implements RecordWriter {
+
+ private PartialRow row;
+
+ public void setUp(PartialRow row) {
+ this.row = row;
+ }
+
+ <#list vv.types as type>
+ <#list type.minor as minor>
+ <#list vv.modes as mode>
+
+ <#if mode.prefix == "Repeated" ||
+ minor.class == "TinyInt" ||
+ minor.class == "UInt1" ||
+ minor.class == "UInt2" ||
+ minor.class == "SmallInt" ||
+ minor.class == "Time" ||
+ minor.class == "Decimal9" ||
+ minor.class == "Decimal18" ||
+ minor.class == "Date" ||
+ minor.class == "UInt4" ||
+ minor.class == "Decimal28Sparse" ||
+ minor.class == "Decimal38Sparse" ||
+ minor.class?contains("Interval")
+ >
+
+ <#else>
+ @Override
+ public FieldConverter getNew${mode.prefix}${minor.class}Converter(int fieldId, String fieldName, FieldReader reader) {
+ return new ${mode.prefix}${minor.class}KuduConverter(fieldId, fieldName, reader);
+ }
+
+ public class ${mode.prefix}${minor.class}KuduConverter extends FieldConverter {
+ private Nullable${minor.class}Holder holder = new Nullable${minor.class}Holder();
+
+ public ${mode.prefix}${minor.class}KuduConverter(int fieldId, String fieldName, FieldReader reader) {
+ super(fieldId, fieldName, reader);
+ }
+
+ @Override
+ public void writeField() throws IOException {
+
+ <#if mode.prefix == "Nullable" >
+ if (!reader.isSet()) {
+ return;
+ }
+ </#if>
+
+ reader.read(holder);
+
+ <#if minor.class == "Float4">
+ row.addFloat(fieldId, holder.value);
+ <#elseif minor.class == "TimeStamp">
+ row.addLong(fieldId, holder.value*1000);
+ <#elseif minor.class == "Int">
+ row.addInt(fieldId, holder.value);
+ <#elseif minor.class == "BigInt">
+ row.addLong(fieldId, holder.value);
+ <#elseif minor.class == "Float8">
+ row.addDouble(fieldId, holder.value);
+ <#elseif minor.class == "Bit">
+ row.addBoolean(fieldId, holder.value == 1);
+ <#elseif minor.class == "VarChar" >
+ byte[] bytes = new byte[holder.end - holder.start];
+ holder.buffer.getBytes(holder.start, bytes);
+ row.addStringUtf8(fieldId, bytes);
+ <#elseif minor.class == "VarBinary">
+ byte[] bytes = new byte[holder.end - holder.start];
+ holder.buffer.getBytes(holder.start, bytes);
+ row.addBinary(fieldId, bytes);
+ reader.read(holder);
+ <#else>
+ throw new UnsupportedOperationException();
+ </#if>
+ }
+ }
+ </#if>
+ </#list>
+ </#list>
+ </#list>
+ }
http://git-wip-us.apache.org/repos/asf/drill/blob/392d1f7e/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java
index a97df77..abd2ab7 100644
--- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java
+++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java
@@ -75,7 +75,7 @@ public class KuduRecordReader extends AbstractRecordReader {
private final KuduSubScanSpec scanSpec;
private KuduScanner scanner;
private RowResultIterator iterator;
-
+
private OutputMutator output;
private OperatorContext context;
@@ -84,8 +84,8 @@ public class KuduRecordReader extends AbstractRecordReader {
ValueVector vv;
ColumnSchema kuduColumn;
}
- private ImmutableList<ProjectedColumnInfo> projectedCols;
+ private ImmutableList<ProjectedColumnInfo> projectedCols;
public KuduRecordReader(KuduClient client, KuduSubScan.KuduSubScanSpec subScanSpec,
List<SchemaPath> projectedColumns, FragmentContext context) {
@@ -101,7 +101,7 @@ public class KuduRecordReader extends AbstractRecordReader {
this.context = context;
try {
KuduTable table = client.openTable(scanSpec.getTableName());
-
+
KuduScannerBuilder builder = client.newScannerBuilder(table);
if (!isStarQuery()) {
List<String> colNames = Lists.newArrayList();
@@ -114,9 +114,9 @@ public class KuduRecordReader extends AbstractRecordReader {
context.getStats().startWait();
try {
scanner = builder
- .lowerBoundPartitionKeyRaw(scanSpec.getStartKey())
- .exclusiveUpperBoundPartitionKeyRaw(scanSpec.getEndKey())
- .build();
+ .lowerBoundPartitionKeyRaw(scanSpec.getStartKey())
+ .exclusiveUpperBoundPartitionKeyRaw(scanSpec.getEndKey())
+ .build();
} finally {
context.getStats().stopWait();
}
@@ -125,7 +125,7 @@ public class KuduRecordReader extends AbstractRecordReader {
}
}
- static final Map<Type,MinorType> TYPES;
+ static final Map<Type, MinorType> TYPES;
static {
TYPES = ImmutableMap.<Type, MinorType> builder()
@@ -169,14 +169,14 @@ public class KuduRecordReader extends AbstractRecordReader {
}
return rowCount;
}
-
+
@SuppressWarnings("unchecked")
private void initCols(Schema schema) throws SchemaChangeException {
ImmutableList.Builder<ProjectedColumnInfo> pciBuilder = ImmutableList.builder();
-
+
for (int i = 0; i < schema.getColumnCount(); i++) {
- ColumnSchema col = schema.getColumnByIndex(i);
-
+ ColumnSchema col = schema.getColumnByIndex(i);
+
final String name = col.getName();
final Type kuduType = col.getType();
MinorType minorType = TYPES.get(kuduType);
@@ -184,7 +184,7 @@ public class KuduRecordReader extends AbstractRecordReader {
logger.warn("Ignoring column that is unsupported.", UserException
.unsupportedError()
.message(
- "A column you queried has a data type that is not currently supported by the JDBC storage plugin. "
+ "A column you queried has a data type that is not currently supported by the Kudu storage plugin. "
+ "The column's name was %s and its Kudu data type was %s. ",
name, kuduType.toString())
.addContext("column Name", name)
@@ -204,7 +204,7 @@ public class KuduRecordReader extends AbstractRecordReader {
minorType, majorType.getMode());
ValueVector vector = output.addField(field, clazz);
vector.allocateNew();
-
+
ProjectedColumnInfo pci = new ProjectedColumnInfo();
pci.vv = vector;
pci.kuduColumn = col;
@@ -214,111 +214,109 @@ public class KuduRecordReader extends AbstractRecordReader {
projectedCols = pciBuilder.build();
}
-
+
private void addRowResult(RowResult result, int rowIndex) throws SchemaChangeException {
if (projectedCols == null) {
initCols(result.getColumnProjection());
}
-
+
for (ProjectedColumnInfo pci : projectedCols) {
if (result.isNull(pci.index)) {
continue;
}
switch (pci.kuduColumn.getType()) {
- case BINARY:
- {
+ case BINARY: {
ByteBuffer value = result.getBinary(pci.index);
if (pci.kuduColumn.isNullable()) {
- ((NullableVarBinaryVector.Mutator)pci.vv.getMutator())
- .setSafe(rowIndex, value, 0, value.remaining());
+ ((NullableVarBinaryVector.Mutator) pci.vv.getMutator())
+ .setSafe(rowIndex, value, 0, value.remaining());
} else {
- ((VarBinaryVector.Mutator)pci.vv.getMutator())
- .setSafe(rowIndex, value, 0, value.remaining());
+ ((VarBinaryVector.Mutator) pci.vv.getMutator())
+ .setSafe(rowIndex, value, 0, value.remaining());
}
break;
}
- case STRING:
- {
+ case STRING: {
ByteBuffer value = result.getBinary(pci.index);
if (pci.kuduColumn.isNullable()) {
- ((NullableVarCharVector.Mutator)pci.vv.getMutator())
- .setSafe(rowIndex, value, 0, value.remaining());
+ ((NullableVarCharVector.Mutator) pci.vv.getMutator())
+ .setSafe(rowIndex, value, 0, value.remaining());
} else {
- ((VarCharVector.Mutator)pci.vv.getMutator())
- .setSafe(rowIndex, value, 0, value.remaining());
+ ((VarCharVector.Mutator) pci.vv.getMutator())
+ .setSafe(rowIndex, value, 0, value.remaining());
}
break;
}
case BOOL:
if (pci.kuduColumn.isNullable()) {
- ((NullableBitVector.Mutator)pci.vv.getMutator())
- .setSafe(rowIndex, result.getBoolean(pci.index) ? 1 : 0);
+ ((NullableBitVector.Mutator) pci.vv.getMutator())
+ .setSafe(rowIndex, result.getBoolean(pci.index) ? 1 : 0);
} else {
- ((BitVector.Mutator)pci.vv.getMutator())
- .setSafe(rowIndex, result.getBoolean(pci.index) ? 1 : 0);
+ ((BitVector.Mutator) pci.vv.getMutator())
+ .setSafe(rowIndex, result.getBoolean(pci.index) ? 1 : 0);
}
break;
case DOUBLE:
if (pci.kuduColumn.isNullable()) {
- ((NullableFloat8Vector.Mutator)pci.vv.getMutator())
- .setSafe(rowIndex, result.getDouble(pci.index));
+ ((NullableFloat8Vector.Mutator) pci.vv.getMutator())
+ .setSafe(rowIndex, result.getDouble(pci.index));
} else {
- ((Float8Vector.Mutator)pci.vv.getMutator())
- .setSafe(rowIndex, result.getDouble(pci.index));
+ ((Float8Vector.Mutator) pci.vv.getMutator())
+ .setSafe(rowIndex, result.getDouble(pci.index));
}
break;
case FLOAT:
if (pci.kuduColumn.isNullable()) {
- ((NullableFloat4Vector.Mutator)pci.vv.getMutator())
- .setSafe(rowIndex, result.getFloat(pci.index));
+ ((NullableFloat4Vector.Mutator) pci.vv.getMutator())
+ .setSafe(rowIndex, result.getFloat(pci.index));
} else {
- ((Float4Vector.Mutator)pci.vv.getMutator())
- .setSafe(rowIndex, result.getFloat(pci.index));
+ ((Float4Vector.Mutator) pci.vv.getMutator())
+ .setSafe(rowIndex, result.getFloat(pci.index));
}
break;
case INT16:
if (pci.kuduColumn.isNullable()) {
- ((NullableIntVector.Mutator)pci.vv.getMutator())
- .setSafe(rowIndex, result.getShort(pci.index));
+ ((NullableIntVector.Mutator) pci.vv.getMutator())
+ .setSafe(rowIndex, result.getShort(pci.index));
} else {
- ((IntVector.Mutator)pci.vv.getMutator())
- .setSafe(rowIndex, result.getShort(pci.index));
+ ((IntVector.Mutator) pci.vv.getMutator())
+ .setSafe(rowIndex, result.getShort(pci.index));
}
break;
case INT32:
if (pci.kuduColumn.isNullable()) {
- ((NullableIntVector.Mutator)pci.vv.getMutator())
- .setSafe(rowIndex, result.getInt(pci.index));
+ ((NullableIntVector.Mutator) pci.vv.getMutator())
+ .setSafe(rowIndex, result.getInt(pci.index));
} else {
- ((IntVector.Mutator)pci.vv.getMutator())
- .setSafe(rowIndex, result.getInt(pci.index));
+ ((IntVector.Mutator) pci.vv.getMutator())
+ .setSafe(rowIndex, result.getInt(pci.index));
}
break;
case INT8:
if (pci.kuduColumn.isNullable()) {
- ((NullableIntVector.Mutator)pci.vv.getMutator())
- .setSafe(rowIndex, result.getByte(pci.index));
+ ((NullableIntVector.Mutator) pci.vv.getMutator())
+ .setSafe(rowIndex, result.getByte(pci.index));
} else {
- ((IntVector.Mutator)pci.vv.getMutator())
- .setSafe(rowIndex, result.getByte(pci.index));
+ ((IntVector.Mutator) pci.vv.getMutator())
+ .setSafe(rowIndex, result.getByte(pci.index));
}
break;
case INT64:
if (pci.kuduColumn.isNullable()) {
- ((NullableBigIntVector.Mutator)pci.vv.getMutator())
- .setSafe(rowIndex, result.getLong(pci.index));
+ ((NullableBigIntVector.Mutator) pci.vv.getMutator())
+ .setSafe(rowIndex, result.getLong(pci.index));
} else {
- ((BigIntVector.Mutator)pci.vv.getMutator())
- .setSafe(rowIndex, result.getLong(pci.index));
+ ((BigIntVector.Mutator) pci.vv.getMutator())
+ .setSafe(rowIndex, result.getLong(pci.index));
}
break;
case TIMESTAMP:
if (pci.kuduColumn.isNullable()) {
- ((NullableTimeStampVector.Mutator)pci.vv.getMutator())
- .setSafe(rowIndex, result.getLong(pci.index) / 1000);
+ ((NullableTimeStampVector.Mutator) pci.vv.getMutator())
+ .setSafe(rowIndex, result.getLong(pci.index) / 1000);
} else {
- ((TimeStampVector.Mutator)pci.vv.getMutator())
- .setSafe(rowIndex, result.getLong(pci.index) / 1000);
+ ((TimeStampVector.Mutator) pci.vv.getMutator())
+ .setSafe(rowIndex, result.getLong(pci.index) / 1000);
}
break;
default:
@@ -327,9 +325,8 @@ public class KuduRecordReader extends AbstractRecordReader {
}
}
-
@Override
public void close() {
}
-
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/392d1f7e/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordWriterImpl.java
----------------------------------------------------------------------
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordWriterImpl.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordWriterImpl.java
new file mode 100644
index 0000000..6b39cc5
--- /dev/null
+++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordWriterImpl.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kudu;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.kududb.ColumnSchema;
+import org.kududb.Schema;
+import org.kududb.Type;
+import org.kududb.client.Insert;
+import org.kududb.client.KuduClient;
+import org.kududb.client.KuduSession;
+import org.kududb.client.KuduTable;
+import org.kududb.client.OperationResponse;
+import org.kududb.client.SessionConfiguration.FlushMode;
+
+public class KuduRecordWriterImpl extends KuduRecordWriter {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KuduRecordWriterImpl.class);
+
+ private static final int FLUSH_FREQUENCY = 100;
+
+ private final KuduClient client;
+ private final String name;
+ private final OperatorContext context;
+ private KuduTable table;
+ private KuduSession session;
+
+ private Insert insert;
+ private int recordsSinceFlush;
+
+ public KuduRecordWriterImpl(OperatorContext context, KuduClient client, String name) {
+ this.client = client;
+ this.name = name;
+ this.context = context;
+ session = client.newSession();
+ session.setFlushMode(FlushMode.MANUAL_FLUSH);
+ }
+
+ @Override
+ public void init(Map<String, String> writerOptions) throws IOException {
+
+ }
+
+ @Override
+ public void updateSchema(VectorAccessible batch) throws IOException {
+ BatchSchema schema = batch.getSchema();
+ int i = 0;
+
+ try {
+ if (!checkForTable(name)) {
+ List<ColumnSchema> columns = new ArrayList<>();
+ for (MaterializedField f : schema) {
+ columns.add(new ColumnSchema.ColumnSchemaBuilder(f.getLastName(), getType(f.getType()))
+ .nullable(f.getType().getMode() == DataMode.OPTIONAL)
+ .key(i == 0).build());
+ i++;
+ }
+ Schema kuduSchema = new Schema(columns);
+ table = client.createTable(name, kuduSchema);
+ }
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ private boolean checkForTable(String name) throws Exception {
+ return !client.getTablesList(name).getTablesList().isEmpty();
+ }
+
+ private Type getType(MajorType t) {
+
+ if(t.getMode() == DataMode.REPEATED){
+ throw UserException
+ .dataWriteError()
+ .message("Kudu does not support array types.")
+ .build(logger);
+ }
+
+ switch (t.getMinorType()) {
+ case BIGINT:
+ return Type.INT64;
+ case BIT:
+ return Type.BOOL;
+ case FLOAT4:
+ return Type.FLOAT;
+ case FLOAT8:
+ return Type.DOUBLE;
+ case INT:
+ return Type.INT32;
+ case TIMESTAMP:
+ return Type.TIMESTAMP;
+ case VARBINARY:
+ return Type.BINARY;
+ case VARCHAR:
+ return Type.STRING;
+ default:
+ throw UserException
+ .dataWriteError()
+ .message("Data type: '%s' not supported in Kudu.", t.getMinorType().name())
+ .build(logger);
+ }
+ }
+
+ @Override
+ public void startRecord() throws IOException {
+ insert = table.newInsert();
+ setUp(insert.getRow());
+ }
+
+ @Override
+ public void endRecord() throws IOException {
+ try {
+ session.apply(insert);
+ recordsSinceFlush++;
+ if (recordsSinceFlush == FLUSH_FREQUENCY) {
+ flush();
+ recordsSinceFlush = 0;
+ }
+ insert = null;
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void abort() throws IOException {
+ }
+
+ private void flush() throws IOException {
+ try {
+ // context.getStats().startWait();
+ List<OperationResponse> responses = session.flush();
+ for (OperationResponse response : responses) {
+ if (response.hasRowError()) {
+ throw new IOException(response.getRowError().toString());
+ }
+ }
+ } catch (Exception e) {
+ throw new IOException(e);
+ } finally {
+ // context.getStats().stopWait();
+ }
+ }
+
+ @Override
+ public void cleanup() throws IOException {
+ flush();
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/392d1f7e/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSchemaFactory.java
----------------------------------------------------------------------
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSchemaFactory.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSchemaFactory.java
index 7ea4f2f..af2775d 100644
--- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSchemaFactory.java
+++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSchemaFactory.java
@@ -19,11 +19,15 @@ package org.apache.drill.exec.store.kudu;
import java.io.IOException;
import java.util.Collections;
+import java.util.List;
import java.util.Set;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.Table;
import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Writer;
+import org.apache.drill.exec.planner.logical.CreateTableEntry;
import org.apache.drill.exec.store.AbstractSchema;
import org.apache.drill.exec.store.SchemaConfig;
import org.apache.drill.exec.store.SchemaFactory;
@@ -97,6 +101,23 @@ public class KuduSchemaFactory implements SchemaFactory {
}
@Override
+ public CreateTableEntry createNewTable(final String tableName, List<String> partitionColumns) {
+ return new CreateTableEntry(){
+
+ @Override
+ public Writer getWriter(PhysicalOperator child) throws IOException {
+ return new KuduWriter(child, tableName, plugin);
+ }
+
+ @Override
+ public List<String> getPartitionColumns() {
+ return Collections.emptyList();
+ }
+
+ };
+ }
+
+ @Override
public void dropTable(String tableName) {
try {
plugin.getClient().deleteTable(tableName);
http://git-wip-us.apache.org/repos/asf/drill/blob/392d1f7e/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePlugin.java
----------------------------------------------------------------------
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePlugin.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePlugin.java
index 5e981b8..15aa469 100644
--- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePlugin.java
+++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePlugin.java
@@ -18,16 +18,12 @@
package org.apache.drill.exec.store.kudu;
import java.io.IOException;
-import java.util.Collections;
-import java.util.Set;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.drill.common.JSONOptions;
-import org.apache.drill.exec.ops.OptimizerRulesContext;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.AbstractStoragePlugin;
import org.apache.drill.exec.store.SchemaConfig;
-import org.apache.drill.exec.store.StoragePluginOptimizerRule;
import org.kududb.client.KuduClient;
import com.fasterxml.jackson.core.type.TypeReference;
@@ -83,6 +79,11 @@ public class KuduStoragePlugin extends AbstractStoragePlugin {
}
@Override
+ public boolean supportsWrite() {
+ return true;
+ }
+
+ @Override
public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
schemaFactory.registerSchemas(schemaConfig, parent);
}
@@ -92,8 +93,4 @@ public class KuduStoragePlugin extends AbstractStoragePlugin {
return engineConfig;
}
- @Override
- public Set<StoragePluginOptimizerRule> getOptimizerRules(OptimizerRulesContext optimizerRulesContext) {
- return Collections.EMPTY_SET;
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/392d1f7e/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriter.java
----------------------------------------------------------------------
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriter.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriter.java
new file mode 100644
index 0000000..03e29d3
--- /dev/null
+++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriter.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kudu;
+
+import java.io.IOException;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.physical.base.AbstractWriter;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class KuduWriter extends AbstractWriter {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KuduWriter.class);
+
+ private final KuduStoragePlugin plugin;
+ private final String name;
+
+ @JsonCreator
+ public KuduWriter(
+ @JsonProperty("child") PhysicalOperator child,
+ @JsonProperty("name") String name,
+ @JsonProperty("storage") StoragePluginConfig storageConfig,
+ @JacksonInject StoragePluginRegistry engineRegistry) throws IOException, ExecutionSetupException {
+ super(child);
+ this.plugin = (KuduStoragePlugin) engineRegistry.getPlugin(storageConfig);
+ this.name = name;
+ }
+
+
+ KuduWriter(PhysicalOperator child, String name, KuduStoragePlugin plugin) {
+ super(child);
+ this.name = name;
+ this.plugin = plugin;
+ }
+
+ @Override
+ public int getOperatorType() {
+ return 3001;
+ }
+
+ @Override
+ protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
+ return new KuduWriter(child, name, plugin);
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public StoragePluginConfig getStorage() {
+ return plugin.getConfig();
+ }
+
+ @JsonIgnore
+ public KuduStoragePlugin getPlugin() {
+ return plugin;
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/392d1f7e/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriterBatchCreator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriterBatchCreator.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriterBatchCreator.java
new file mode 100644
index 0000000..c200c17
--- /dev/null
+++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriterBatchCreator.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kudu;
+
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.WriterRecordBatch;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.RecordBatch;
+
+public class KuduWriterBatchCreator implements BatchCreator<KuduWriter> {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KuduWriterBatchCreator.class);
+
+ @Override
+ public CloseableRecordBatch getBatch(FragmentContext context, KuduWriter config, List<RecordBatch> children)
+ throws ExecutionSetupException {
+ assert children != null && children.size() == 1;
+
+ return new WriterRecordBatch(config, children.iterator().next(), context, new KuduRecordWriterImpl(
+ null,
+ config.getPlugin().getClient(),
+ config.getName()));
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/drill/blob/392d1f7e/contrib/storage-kudu/src/main/resources/bootstrap-storage-plugins.json
----------------------------------------------------------------------
diff --git a/contrib/storage-kudu/src/main/resources/bootstrap-storage-plugins.json b/contrib/storage-kudu/src/main/resources/bootstrap-storage-plugins.json
index 3ba12c0..52884a6 100644
--- a/contrib/storage-kudu/src/main/resources/bootstrap-storage-plugins.json
+++ b/contrib/storage-kudu/src/main/resources/bootstrap-storage-plugins.json
@@ -2,8 +2,8 @@
"storage":{
kudu : {
type:"kudu",
- masterAddresses: "172.31.1.99",
- enabled: true
+ masterAddresses: "1.2.3.4",
+ enabled: false
}
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/392d1f7e/contrib/storage-kudu/src/main/resources/checkstyle-config.xml
----------------------------------------------------------------------
diff --git a/contrib/storage-kudu/src/main/resources/checkstyle-config.xml b/contrib/storage-kudu/src/main/resources/checkstyle-config.xml
deleted file mode 100644
index 74cc856..0000000
--- a/contrib/storage-kudu/src/main/resources/checkstyle-config.xml
+++ /dev/null
@@ -1,42 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
- license agreements. See the NOTICE file distributed with this work for additional
- information regarding copyright ownership. The ASF licenses this file to^M
- You under the Apache License, Version 2.0 (the "License"); you may not use
- this file except in compliance with the License. You may obtain a copy of
- the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
- by applicable law or agreed to in writing, software distributed under the
- License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
- OF ANY KIND, either express or implied. See the License for the specific
- language governing permissions and limitations under the License. -->
-<!DOCTYPE module PUBLIC
- "-//Puppy Crawl//DTD Check Configuration 1.2//EN"
- "http://www.puppycrawl.com/dtds/configuration_1_2.dtd">
-
-<!-- Checkstyle configuration for Apache Drill -->
-
-<module name="Checker">
-
- <module name="TreeWalker">
-
- <module name="IllegalImport">
- <!-- For "org.apache.commons.lang.*" classes use the corresponding class from "org.apache.commons.lang3.*" -->
- <property name="illegalPkgs" value="com.google.hive12,com.beust.jcommander.internal,jersey.repackaged,org.apache.commons.lang"/>
- </module>
-
- <module name="AvoidStarImport">
- <property name="allowStaticMemberImports" value="true"/>
- </module>
-
- <module name="NeedBraces"/>
-
- </module>
-
- <module name="FileTabCharacter"/>
-
- <module name="RegexpSingleline">
- <property name="format" value="\s+$"/>
- <property name="message" value="A line of code cannot contain any trailing whitespace"/>
- </module>
-
-</module>
http://git-wip-us.apache.org/repos/asf/drill/blob/392d1f7e/contrib/storage-kudu/src/main/resources/checkstyle-suppressions.xml
----------------------------------------------------------------------
diff --git a/contrib/storage-kudu/src/main/resources/checkstyle-suppressions.xml b/contrib/storage-kudu/src/main/resources/checkstyle-suppressions.xml
deleted file mode 100644
index 9d4682b..0000000
--- a/contrib/storage-kudu/src/main/resources/checkstyle-suppressions.xml
+++ /dev/null
@@ -1,19 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
- license agreements. See the NOTICE file distributed with this work for additional
- information regarding copyright ownership. The ASF licenses this file to
- You under the Apache License, Version 2.0 (the "License"); you may not use
- this file except in compliance with the License. You may obtain a copy of
- the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
- by applicable law or agreed to in writing, software distributed under the
- License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
- OF ANY KIND, either express or implied. See the License for the specific
- language governing permissions and limitations under the License. -->
-<!DOCTYPE suppressions PUBLIC
- "-//Puppy Crawl//DTD Suppressions 1.1//EN"
- "suppressions_1_1.dtd">
-
-<!-- Checkstyle Suppressions for Apache Drill -->
-<suppressions>
- <suppress files="[\\/]generated-sources[\\/]" checks="AvoidStarImport,NeedBraces"/>
-</suppressions>
http://git-wip-us.apache.org/repos/asf/drill/blob/392d1f7e/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduConnect.java
----------------------------------------------------------------------
diff --git a/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduConnect.java b/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduConnect.java
index 5f36f80..0ee0134 100644
--- a/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduConnect.java
+++ b/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduConnect.java
@@ -20,11 +20,12 @@ package org.apache.drill.store.kudu;
import java.util.ArrayList;
import java.util.List;
+import org.junit.Ignore;
import org.junit.Test;
import org.kududb.ColumnSchema;
import org.kududb.Schema;
import org.kududb.Type;
-import org.kududb.client.CreateTableBuilder;
+import org.kududb.client.CreateTableOptions;
import org.kududb.client.Insert;
import org.kududb.client.KuduClient;
import org.kududb.client.KuduScanner;
@@ -36,9 +37,7 @@ import org.kududb.client.RowResult;
import org.kududb.client.RowResultIterator;
import org.kududb.client.SessionConfiguration;
-import static org.kududb.Type.STRING;
-
-
+@Ignore("requires remote kudu server")
public class TestKuduConnect {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestKuduConnect.class);
@@ -62,7 +61,7 @@ public class TestKuduConnect {
Schema schema = new Schema(columns);
- CreateTableBuilder builder = new CreateTableBuilder();
+ CreateTableOptions builder = new CreateTableOptions();
builder.setNumReplicas(replicas);
for (int i = 1; i < tablets; i++) {
PartialRow splitRow = schema.newPartialRow();
http://git-wip-us.apache.org/repos/asf/drill/blob/392d1f7e/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduPlugin.java
----------------------------------------------------------------------
diff --git a/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduPlugin.java b/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduPlugin.java
index d5e138f..450a1ad 100644
--- a/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduPlugin.java
+++ b/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduPlugin.java
@@ -18,8 +18,10 @@
package org.apache.drill.store.kudu;
import org.apache.drill.BaseTestQuery;
+import org.junit.Ignore;
import org.junit.Test;
+@Ignore("requires a remote kudu server to run.")
public class TestKuduPlugin extends BaseTestQuery {
@Test
@@ -33,4 +35,12 @@ public class TestKuduPlugin extends BaseTestQuery {
test("show tables;");
test("describe demo");
}
+
+ @Test
+ public void testCreate() throws Exception {
+ test("create table kudu.regions as select 1, * from sys.options limit 1");
+ test("select * from kudu.regions");
+ test("drop table kudu.regions");
+
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/392d1f7e/contrib/storage-kudu/src/test/resources/bootstrap-storage-plugins.json
----------------------------------------------------------------------
diff --git a/contrib/storage-kudu/src/test/resources/bootstrap-storage-plugins.json b/contrib/storage-kudu/src/test/resources/bootstrap-storage-plugins.json
new file mode 100644
index 0000000..3ba12c0
--- /dev/null
+++ b/contrib/storage-kudu/src/test/resources/bootstrap-storage-plugins.json
@@ -0,0 +1,9 @@
+{
+ "storage":{
+ kudu : {
+ type:"kudu",
+ masterAddresses: "172.31.1.99",
+ enabled: true
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/392d1f7e/distribution/pom.xml
----------------------------------------------------------------------
diff --git a/distribution/pom.xml b/distribution/pom.xml
index 5aaf09d..135c974 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -168,6 +168,11 @@
</dependency>
<dependency>
<groupId>org.apache.drill.contrib</groupId>
+ <artifactId>drill-kudu-storage</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.drill.contrib</groupId>
<artifactId>drill-mongo-storage</artifactId>
<version>${project.version}</version>
</dependency>
http://git-wip-us.apache.org/repos/asf/drill/blob/392d1f7e/distribution/src/assemble/bin.xml
----------------------------------------------------------------------
diff --git a/distribution/src/assemble/bin.xml b/distribution/src/assemble/bin.xml
index 449ac6c..12682e2 100644
--- a/distribution/src/assemble/bin.xml
+++ b/distribution/src/assemble/bin.xml
@@ -97,6 +97,7 @@
<include>org.apache.drill.contrib:drill-mongo-storage</include>
<include>org.apache.drill.contrib:drill-storage-hbase</include>
<include>org.apache.drill.contrib:drill-jdbc-storage</include>
+ <include>org.apache.drill.contrib:drill-kudu-storage</include>
<include>org.apache.drill.contrib:drill-gis</include>
</includes>
<excludes>