You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2014/05/07 03:20:55 UTC
[1/6] git commit: DRILL-643: Fix ByteBuf allocation in
o.a.d.exec.vector.ValueHolderHelper.getVarCharHolder()
Repository: incubator-drill
Updated Branches:
refs/heads/master e790e7962 -> 08923cb82
DRILL-643: Fix ByteBuf allocation in o.a.d.exec.vector.ValueHolderHelper.getVarCharHolder()
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/3af2344f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/3af2344f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/3af2344f
Branch: refs/heads/master
Commit: 3af2344f27749d63890a379c5b9ed4d92b142a73
Parents: e790e79
Author: Aditya Kishore <ad...@maprtech.com>
Authored: Tue May 6 01:17:18 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Tue May 6 15:39:45 2014 -0700
----------------------------------------------------------------------
.../drill/exec/vector/ValueHolderHelper.java | 19 +++++++++----------
1 file changed, 9 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3af2344f/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueHolderHelper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueHolderHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueHolderHelper.java
index e4af851..fb9dfd0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueHolderHelper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueHolderHelper.java
@@ -17,35 +17,34 @@
*/
package org.apache.drill.exec.vector;
-import java.nio.ByteOrder;
-import io.netty.buffer.ByteBuf;
import io.netty.buffer.SwappedByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
+import java.math.BigDecimal;
+import java.nio.ByteOrder;
+
import org.apache.drill.common.util.DecimalUtility;
-import org.apache.drill.exec.expr.holders.VarCharHolder;
-import org.apache.drill.exec.expr.holders.IntervalDayHolder;
-import org.apache.drill.exec.expr.holders.Decimal9Holder;
import org.apache.drill.exec.expr.holders.Decimal18Holder;
import org.apache.drill.exec.expr.holders.Decimal28SparseHolder;
import org.apache.drill.exec.expr.holders.Decimal38SparseHolder;
+import org.apache.drill.exec.expr.holders.Decimal9Holder;
+import org.apache.drill.exec.expr.holders.IntervalDayHolder;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
import com.google.common.base.Charsets;
-import java.math.BigDecimal;
-
public class ValueHolderHelper {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ValueHolderHelper.class);
-
+
public static VarCharHolder getVarCharHolder(String s){
VarCharHolder vch = new VarCharHolder();
-
+
byte[] b = s.getBytes(Charsets.UTF_8);
vch.start = 0;
vch.end = b.length;
- vch.buffer = UnpooledByteBufAllocator.DEFAULT.buffer(s.length()).order(ByteOrder.LITTLE_ENDIAN); // use the length of input string to allocate buffer.
+ vch.buffer = UnpooledByteBufAllocator.DEFAULT.buffer(b.length).order(ByteOrder.LITTLE_ENDIAN); // use the length of input string to allocate buffer.
vch.buffer.setBytes(0, b);
return vch;
}
[2/6] git commit: DRILL-644: Fix char length and position calculation
for UTF-8 strings in StringFunctionUtil
Posted by ja...@apache.org.
DRILL-644: Fix char length and position calculation for UTF-8 strings in StringFunctionUtil
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/0f65fd8d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/0f65fd8d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/0f65fd8d
Branch: refs/heads/master
Commit: 0f65fd8d3bb40300e5455f31603d41dd9a149bb6
Parents: 3af2344
Author: Aditya Kishore <ad...@maprtech.com>
Authored: Tue May 6 01:25:09 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Tue May 6 15:40:10 2014 -0700
----------------------------------------------------------------------
.../exec/expr/fn/impl/StringFunctionUtil.java | 48 +++++++++++---------
.../exec/physical/impl/TestStringFunctions.java | 37 ++++++---------
.../resources/functions/string/testSubstr.json | 9 ++--
3 files changed, 46 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0f65fd8d/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctionUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctionUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctionUtil.java
index 0096a13..fbdab8e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctionUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctionUtil.java
@@ -17,6 +17,8 @@
*/
package org.apache.drill.exec.expr.fn.impl;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+
import io.netty.buffer.ByteBuf;
public class StringFunctionUtil {
@@ -26,15 +28,9 @@ public class StringFunctionUtil {
public static int getUTF8CharLength(ByteBuf buffer, int start, int end) {
int charCount = 0;
- for (int id = start; id < end; id++) {
- byte currentByte = buffer.getByte(id);
-
- if (currentByte < 0x128 || // 1-byte char. First byte is 0xxxxxxx.
- (currentByte & 0xE0) == 0xC0 || // 2-byte char. First byte is 110xxxxx
- (currentByte & 0xF0) == 0xE0 || // 3-byte char. First byte is 1110xxxx
- (currentByte & 0xF8) == 0xF0) { //4-byte char. First byte is 11110xxx
- charCount ++; //Advance the counter, since we find one char.
- }
+ for (int idx = start, charLen = 0; idx < end; idx += charLen) {
+ charLen = utf8CharLen(buffer, idx);
+ ++charCount; //Advance the counter, since we find one char.
}
return charCount;
}
@@ -46,21 +42,14 @@ public class StringFunctionUtil {
public static int getUTF8CharPosition(ByteBuf buffer, int start, int end, int charLength) {
int charCount = 0;
- if (start >=end)
+ if (start >= end)
return -1; //wrong input here.
- for (int id = start; id < end; id++) {
-
- byte currentByte = buffer.getByte(id);
-
- if (currentByte < 0x128 || // 1-byte char. First byte is 0xxxxxxx.
- (currentByte & 0xE0) == 0xC0 || // 2-byte char. First byte is 110xxxxx
- (currentByte & 0xF0) == 0xE0 || // 3-byte char. First byte is 1110xxxx
- (currentByte & 0xF8) == 0xF0) { //4-byte char. First byte is 11110xxx
- charCount ++; //Advance the counter, since we find one char.
- if (charCount == charLength + 1) {
- return id;
- }
+ for (int idx = start, charLen = 0; idx < end; idx += charLen) {
+ charLen = utf8CharLen(buffer, idx);
+ ++charCount; //Advance the counter, since we find one char.
+ if (charCount == charLength + 1) {
+ return idx;
}
}
return end;
@@ -154,4 +143,19 @@ public class StringFunctionUtil {
return (c >= 'a' && c <= 'f') || (c >= 'A' && c <= 'F') || (c >= '0' && c <= '9');
}
+ private static int utf8CharLen(ByteBuf buffer, int idx) {
+ byte firstByte = buffer.getByte(idx);
+ if (firstByte >= 0) { // 1-byte char. First byte is 0xxxxxxx.
+ return 1;
+ } else if ((firstByte & 0xE0) == 0xC0) { // 2-byte char. First byte is 110xxxxx
+ return 2;
+ } else if ((firstByte & 0xF0) == 0xE0) { // 3-byte char. First byte is 1110xxxx
+ return 3;
+ } else if ((firstByte & 0xF8) == 0xF0) { //4-byte char. First byte is 11110xxx
+ return 4;
+ }
+ throw new DrillRuntimeException("Unexpected byte 0x" + Integer.toString((int)firstByte & 0xff, 16)
+ + " at position " + idx + " encountered while decoding UTF8 string.");
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0f65fd8d/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestStringFunctions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestStringFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestStringFunctions.java
index 09d1361..cd310b2 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestStringFunctions.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestStringFunctions.java
@@ -17,45 +17,36 @@
*/
package org.apache.drill.exec.physical.impl;
-import com.google.common.base.Charsets;
-import com.google.common.io.Resources;
-import com.codahale.metrics.MetricRegistry;
-
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import mockit.Injectable;
import mockit.NonStrictExpectations;
import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.expression.ExpressionPosition;
-import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.util.TestTools;
import org.apache.drill.exec.ExecTest;
import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
-import org.apache.drill.exec.expr.holders.BigIntHolder;
-import org.apache.drill.exec.expr.holders.VarCharHolder;
-import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.memory.TopLevelAllocator;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.physical.base.FragmentRoot;
import org.apache.drill.exec.planner.PhysicalPlanReader;
-import org.apache.drill.exec.proto.CoordinationProtos;
-import org.apache.drill.exec.proto.ExecProtos;
import org.apache.drill.exec.proto.BitControl.PlanFragment;
+import org.apache.drill.exec.proto.CoordinationProtos;
import org.apache.drill.exec.rpc.user.UserServer;
import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.vector.BigIntVector;
import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.VarBinaryVector;
import org.apache.drill.exec.vector.VarCharVector;
-import org.junit.AfterClass;
-import org.junit.Ignore;
+import org.junit.Rule;
import org.junit.Test;
-import org.junit.Assert;
+import org.junit.rules.TestRule;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import com.codahale.metrics.MetricRegistry;
+import com.google.common.base.Charsets;
+import com.google.common.io.Resources;
public class TestStringFunctions extends ExecTest {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestStringFunctions.class);
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestStringFunctions.class);
DrillConfig c = DrillConfig.create();
PhysicalPlanReader reader;
@@ -72,7 +63,7 @@ public class TestStringFunctions extends ExecTest {
int i = 0;
for (ValueVector v : exec) {
if (v instanceof VarCharVector) {
- res[i++] = new String( ((VarCharVector) v).getAccessor().get(0));
+ res[i++] = new String( ((VarCharVector) v).getAccessor().get(0), Charsets.UTF_8);
} else
res[i++] = v.getAccessor().getObject(0);
}
@@ -98,10 +89,10 @@ public class TestStringFunctions extends ExecTest {
while(exec.next()){
Object [] res = getRunResult(exec);
- assertEquals("return count does not match", res.length, expectedResults.length);
+ assertEquals("return count does not match", expectedResults.length, res.length);
for (int i = 0; i<res.length; i++) {
- assertEquals(String.format("column %s does not match", i), res[i], expectedResults[i]);
+ assertEquals(String.format("column %s does not match", i), expectedResults[i], res[i]);
}
}
@@ -199,7 +190,7 @@ public class TestStringFunctions extends ExecTest {
@Test
public void testSubstr(@Injectable final DrillbitContext bitContext,
@Injectable UserServer.UserClientConnection connection) throws Throwable{
- Object [] expected = new Object[] {"abc", "bcd", "bcdef", "bcdef", "", "", "", ""};
+ Object [] expected = new Object[] {"abc", "bcd", "bcdef", "bcdef", "", "", "", "", "भारत", "वर्ष", "वर्ष"};
runTest(bitContext, connection, expected, "functions/string/testSubstr.json");
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0f65fd8d/exec/java-exec/src/test/resources/functions/string/testSubstr.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/functions/string/testSubstr.json b/exec/java-exec/src/test/resources/functions/string/testSubstr.json
index e885381..94467ae 100644
--- a/exec/java-exec/src/test/resources/functions/string/testSubstr.json
+++ b/exec/java-exec/src/test/resources/functions/string/testSubstr.json
@@ -23,15 +23,18 @@
child: 1,
pop:"project",
exprs: [
- { ref: "col1", expr: "substring('abcdef', 1, 3)"},
+ { ref: "col1", expr: "substring('abcdef', 1, 3)"},
{ ref: "col2", expr: "substring('abcdef', 2, 3)"},
{ ref: "col3", expr: "substring('abcdef', 2, 5)"},
{ ref: "col4", expr: "substring('abcdef', 2, 10)"},
{ ref: "col5", expr: "substring('abcdef', 0, 3)"},
{ ref: "col6", expr: "substring('abcdef', -1, 3)"},
{ ref: "col7", expr: "substring('', 1, 2)"},
- { ref: "col8", expr: "substring('abcdef', 10, 2)"}
- ]
+ { ref: "col8", expr: "substring('abcdef', 10, 2)"},
+ { ref: "col9", expr: "substring('भारतवर्ष', 1, 4)"},
+ { ref: "col10", expr: "substring('भारतवर्ष', 5, 4)"},
+ { ref: "col11", expr: "substring('भारतवर्ष', 5, 5)"}
+ ]
},
{
@id: 3,
[6/6] git commit: DRILL-381: Implement SYSTEM and SESSION options.
Posted by ja...@apache.org.
DRILL-381: Implement SYSTEM and SESSION options.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/08923cb8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/08923cb8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/08923cb8
Branch: refs/heads/master
Commit: 08923cb82a007171e699f5ac0b4e4380b3a5a448
Parents: 9b34f23
Author: Jason Altekruse <al...@gmail.com>
Authored: Sat Mar 29 15:02:58 2014 -0500
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Tue May 6 18:15:16 2014 -0700
----------------------------------------------------------------------
.../drill/common/logical/PlanProperties.java | 24 +-
.../drill/exec/cache/DistributedCache.java | 9 +-
.../apache/drill/exec/cache/DistributedMap.java | 3 +-
.../exec/cache/DistributedMapDeserializer.java | 28 +++
.../org/apache/drill/exec/cache/HazelCache.java | 55 ++++-
.../exec/cache/JacksonAdvancedSerializer.java | 65 +++++
.../exec/cache/JacksonDrillSerializable.java | 17 --
.../drill/exec/cache/JacksonSerializable.java | 54 +++++
.../org/apache/drill/exec/cache/LocalCache.java | 94 ++++++-
.../exec/cache/SerializationDefinition.java | 36 +++
.../apache/drill/exec/ops/FragmentContext.java | 22 +-
.../org/apache/drill/exec/ops/QueryContext.java | 9 +-
.../drill/exec/ops/QueryContext.java.orig | 135 +++++++++++
.../apache/drill/exec/opt/BasicOptimizer.java | 63 ++++-
.../org/apache/drill/exec/opt/Optimizer.java | 12 +-
.../drill/exec/planner/PhysicalPlanReader.java | 14 +-
.../drill/exec/planner/SimpleExecPlanner.java | 15 +-
.../planner/fragment/SimpleParallelizer.java | 11 +-
.../exec/planner/logical/StoragePlugins.java | 13 +-
.../exec/planner/physical/PlannerSettings.java | 17 +-
.../planner/sql/handlers/DefaultSqlHandler.java | 4 +-
.../planner/sql/handlers/ExplainHandler.java | 12 +-
.../planner/sql/handlers/SetOptionHandler.java | 20 +-
.../sql/handlers/SimpleCommandResult.java | 1 -
.../apache/drill/exec/rpc/user/UserServer.java | 10 +-
.../apache/drill/exec/rpc/user/UserSession.java | 16 +-
.../org/apache/drill/exec/server/Drillbit.java | 2 +
.../apache/drill/exec/server/Drillbit.java.orig | 141 +++++++++++
.../drill/exec/server/DrillbitContext.java | 12 +-
.../drill/exec/server/DrillbitContext.java.orig | 157 ++++++++++++
.../server/options/DrillConfigIterator.java | 85 +++++++
.../server/options/FragmentOptionsManager.java | 81 +++++++
.../drill/exec/server/options/OptionList.java | 23 ++
.../exec/server/options/OptionManager.java | 37 +++
.../exec/server/options/OptionValidator.java | 65 +++++
.../drill/exec/server/options/OptionValue.java | 130 ++++++++++
.../server/options/SessionOptionManager.java | 98 ++++++++
.../exec/server/options/SetOptionException.java | 62 +++++
.../server/options/SystemOptionManager.java | 154 ++++++++++++
.../exec/server/options/TypeValidators.java | 132 ++++++++++
.../drill/exec/store/StoragePluginRegistry.java | 34 +--
.../exec/store/StoragePluginRegistry.java.orig | 243 +++++++++++++++++++
.../exec/store/direct/DirectGroupScan.java | 7 +
.../drill/exec/store/mock/MockGroupScanPOP.java | 27 ++-
.../drill/exec/store/pojo/AbstractWriter.java | 2 +-
.../drill/exec/store/pojo/PojoDataType.java | 72 ++++++
.../drill/exec/store/pojo/PojoRecordReader.java | 20 +-
.../apache/drill/exec/store/pojo/Writers.java | 133 +++++++++-
.../drill/exec/store/sys/DrillbitIterator.java | 61 +++++
.../drill/exec/store/sys/StaticDrillTable.java | 41 ++++
.../drill/exec/store/sys/SystemTable.java | 55 +++++
.../exec/store/sys/SystemTableBatchCreator.java | 44 ++++
.../drill/exec/store/sys/SystemTablePlugin.java | 118 +++++++++
.../exec/store/sys/SystemTablePluginConfig.java | 32 +++
.../drill/exec/store/sys/SystemTableScan.java | 128 ++++++++++
.../exec/work/batch/ControlHandlerImpl.java | 40 +--
.../exec/work/batch/ControlMessageHandler.java | 4 +-
.../apache/drill/exec/work/foreman/Foreman.java | 12 +-
.../work/fragment/NonRootFragmentManager.java | 6 +-
.../apache/drill/exec/work/user/UserWorker.java | 5 +
.../java/org/apache/drill/PlanningBase.java | 18 +-
.../org/apache/drill/TestTpchSingleMode.java | 3 +-
.../exec/physical/impl/TestOptiqPlans.java | 6 +-
.../drill/exec/pop/TestFragmentChecker.java | 29 +--
.../apache/drill/exec/server/TestOptions.java | 41 ++++
.../resources/server/options_session_check.json | 20 ++
.../src/test/resources/server/options_set.json | 24 ++
.../apache/drill/jdbc/test/TestJdbcQuery.java | 3 +
.../org/apache/drill/exec/proto/BitControl.java | 181 +++++++++++++-
protocol/src/main/protobuf/BitControl.proto | 1 +
70 files changed, 3142 insertions(+), 206 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/common/src/main/java/org/apache/drill/common/logical/PlanProperties.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/logical/PlanProperties.java b/common/src/main/java/org/apache/drill/common/logical/PlanProperties.java
index 606f2df..a2942b6 100644
--- a/common/src/main/java/org/apache/drill/common/logical/PlanProperties.java
+++ b/common/src/main/java/org/apache/drill/common/logical/PlanProperties.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,10 +17,9 @@
******************************************************************************/
package org.apache.drill.common.logical;
+import org.apache.drill.common.JSONOptions;
import org.apache.drill.common.logical.PlanProperties.Generator.ResultMode;
-import com.fasterxml.jackson.annotation.JsonInclude;
-import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.annotation.JsonProperty;
/**
@@ -33,12 +32,13 @@ public class PlanProperties {
public int version;
public Generator generator;
public ResultMode resultMode;
+ public JSONOptions options;
// @JsonInclude(Include.NON_NULL)
public static class Generator{
public String type;
public String info;
-
+
public static enum ResultMode{
EXEC, LOGICAL, PHYSICAL;
}
@@ -52,12 +52,14 @@ public class PlanProperties {
private PlanProperties(@JsonProperty("version") int version,
@JsonProperty("generator") Generator generator,
@JsonProperty("type") PlanType type,
- @JsonProperty("mode") ResultMode resultMode
+ @JsonProperty("mode") ResultMode resultMode,
+ @JsonProperty("options") JSONOptions options
) {
this.version = version;
this.generator = generator;
this.type = type;
this.resultMode = resultMode == null ? ResultMode.EXEC : resultMode;
+ this.options = options;
}
public static PlanPropertiesBuilder builder() {
@@ -69,6 +71,7 @@ public class PlanProperties {
private Generator generator;
private PlanType type;
private ResultMode mode = ResultMode.EXEC;
+ private JSONOptions options;
public PlanPropertiesBuilder type(PlanType type) {
this.type = type;
@@ -84,19 +87,24 @@ public class PlanProperties {
this.generator = new Generator(type, info);
return this;
}
-
+
public PlanPropertiesBuilder resultMode(ResultMode mode){
this.mode = mode;
return this;
}
+ public PlanPropertiesBuilder options(JSONOptions options){
+ this.options = options;
+ return this;
+ }
+
public PlanPropertiesBuilder generator(Generator generator) {
this.generator = generator;
return this;
}
public PlanProperties build() {
- return new PlanProperties(version, generator, type, mode);
+ return new PlanProperties(version, generator, type, mode, options);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedCache.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedCache.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedCache.java
index fdace08..65362e0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedCache.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedCache.java
@@ -26,15 +26,16 @@ import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
public interface DistributedCache extends Closeable{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DistributedCache.class);
-
+
public void run() throws DrillbitStartupException;
-
+
// public void updateLocalQueueLength(int length);
-// public List<WorkQueueStatus> getQueueLengths();
-
+// public List<WorkQueueStatus> getQueueLengths();
+
public PlanFragment getFragment(FragmentHandle handle);
public void storeFragment(PlanFragment fragment);
public <V extends DrillSerializable> DistributedMultiMap<V> getMultiMap(Class<V> clazz);
public <V extends DrillSerializable> DistributedMap<V> getMap(Class<V> clazz);
+ public <V extends DrillSerializable> DistributedMap<V> getNamedMap(String name, Class<V> clazz);
public Counter getCounter(String name);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMap.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMap.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMap.java
index 8ea9cd1..1bee5fc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMap.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMap.java
@@ -17,9 +17,10 @@
*/
package org.apache.drill.exec.cache;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
-public interface DistributedMap<V extends DrillSerializable> {
+public interface DistributedMap<V extends DrillSerializable> extends Iterable<Map.Entry<String, V>>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DistributedMap.class);
public V get(String key);
public void put(String key, V value);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMapDeserializer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMapDeserializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMapDeserializer.java
new file mode 100644
index 0000000..453dbfb
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMapDeserializer.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.cache;
+
+import java.io.IOException;
+
+public interface DistributedMapDeserializer<V extends DrillSerializable> {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DistributedMapDeserializer.class);
+
+ public V put(String key, byte[] value) throws IOException;
+
+ public Class getValueClass();
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
index b701c8f..0149a57 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
@@ -19,7 +19,9 @@ package org.apache.drill.exec.cache;
import java.io.IOException;
import java.util.Collection;
+import java.util.Iterator;
import java.util.List;
+import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import org.apache.drill.common.config.DrillConfig;
@@ -45,6 +47,7 @@ import com.hazelcast.core.IMap;
import com.hazelcast.core.ITopic;
import com.hazelcast.core.Message;
import com.hazelcast.core.MessageListener;
+import com.hazelcast.nio.serialization.StreamSerializer;
public class HazelCache implements DistributedCache {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HazelCache.class);
@@ -55,12 +58,26 @@ public class HazelCache implements DistributedCache {
private HandlePlan fragments;
private Cache<WorkQueueStatus, Integer> endpoints;
private BufferAllocator allocator;
+ private DrillConfig config;
public HazelCache(DrillConfig config, BufferAllocator allocator) {
this.instanceName = config.getString(ExecConstants.SERVICE_NAME);
this.allocator = allocator;
+ this.config = config;
}
+ private <T> void addSer(Config c, StreamSerializer<T> serializer, Class<T> clazz){
+ SerializerConfig sc = new SerializerConfig().setImplementation(serializer).setTypeClass(clazz);
+ c.getSerializationConfig().addSerializerConfig(sc);
+ }
+
+ @SuppressWarnings("rawtypes")
+ private <T> void addJSer(Config c, SerializationDefinition d){
+ SerializerConfig sc = new SerializerConfig().setImplementation(new JacksonAdvancedSerializer(d, config.getMapper())).setTypeClass(d.clazz);
+ c.getSerializationConfig().addSerializerConfig(sc);
+ }
+
+
private class Listener implements MessageListener<HWorkQueueStatus>{
@Override
@@ -68,16 +85,16 @@ public class HazelCache implements DistributedCache {
logger.debug("Received new queue length message.");
endpoints.put(wrapped.getMessageObject().get(), 0);
}
-
+
}
-
+
public void run() {
Config c = new Config();
- SerializerConfig sc = new SerializerConfig() //
- .setImplementation(new HCVectorAccessibleSerializer(allocator)) //
- .setTypeClass(VectorAccessibleSerializable.class);
+ addSer(c, new HCVectorAccessibleSerializer(allocator), VectorAccessibleSerializable.class);
+ addJSer(c, SerializationDefinition.OPTION);
+ addJSer(c, SerializationDefinition.STORAGE_PLUGINS);
+
c.setInstanceName(instanceName);
- c.getSerializationConfig().addSerializerConfig(sc);
c.getGroupConfig().setName(instanceName);
for (String s : DrillConfig.create().getStringList(ExecConstants.HAZELCAST_SUBNETS)) {
logger.debug("Adding interface: {}", s);
@@ -128,7 +145,7 @@ public class HazelCache implements DistributedCache {
public void storeFragment(PlanFragment fragment) {
fragments.put(fragment.getHandle(), fragment);
}
-
+
@Override
public <V extends DrillSerializable> DistributedMultiMap<V> getMultiMap(Class<V> clazz) {
@@ -138,12 +155,18 @@ public class HazelCache implements DistributedCache {
@Override
public <V extends DrillSerializable> DistributedMap<V> getMap(Class<V> clazz) {
- IMap<String, V> imap = this.instance.getMap(clazz.toString());
+ return getNamedMap(clazz.getName(), clazz);
+ }
+
+
+ @Override
+ public <V extends DrillSerializable> DistributedMap<V> getNamedMap(String name, Class<V> clazz) {
+ IMap<String, V> imap = this.instance.getMap(name);
MapConfig myMapConfig = new MapConfig();
myMapConfig.setBackupCount(0);
myMapConfig.setReadBackupData(true);
instance.getConfig().getMapConfigs().put(clazz.toString(), myMapConfig);
- return new HCDistributedMapImpl<V>(imap, clazz);
+ return new HCDistributedMapImpl<V>(imap);
}
@Override
@@ -151,10 +174,12 @@ public class HazelCache implements DistributedCache {
return new HCCounterImpl(this.instance.getAtomicLong(name));
}
+
+
public static class HCDistributedMapImpl<V extends DrillSerializable> implements DistributedMap<V> {
- private IMap<String, V> m;
+ private final IMap<String, V> m;
- public HCDistributedMapImpl(IMap<String, V> m, Class<V> clazz) {
+ public HCDistributedMapImpl(IMap<String, V> m) {
this.m = m;
}
@@ -172,7 +197,15 @@ public class HazelCache implements DistributedCache {
public void putIfAbsent(String key, V value, long ttl, TimeUnit timeunit) {
m.putIfAbsent(key, value, ttl, timeunit);
+
+ }
+
+ @Override
+ public Iterator<Entry<String, V>> iterator() {
+ return m.entrySet().iterator();
}
+
+
}
public static class HCDistributedMultiMapImpl<V extends DrillSerializable> implements DistributedMultiMap<V> {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonAdvancedSerializer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonAdvancedSerializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonAdvancedSerializer.java
new file mode 100644
index 0000000..edf31aa
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonAdvancedSerializer.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.cache;
+
+import java.io.IOException;
+
+import org.apache.drill.common.util.DataInputInputStream;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.StreamSerializer;
+
+public class JacksonAdvancedSerializer<T> implements StreamSerializer<T> {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JacksonAdvancedSerializer.class);
+
+ private final Class<?> clazz;
+ private final ObjectMapper mapper;
+ private final int id;
+
+ public JacksonAdvancedSerializer(SerializationDefinition def, ObjectMapper mapper){
+ this.clazz = def.clazz;
+ this.mapper = mapper;
+ this.id = def.id;
+ }
+
+ @Override
+ public int getTypeId() {
+ return id;
+ }
+
+ @Override
+ public void destroy() {
+ }
+
+ @Override
+ public void write(ObjectDataOutput out, T object) throws IOException {
+ out.write(mapper.writeValueAsBytes(object));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public T read(ObjectDataInput in) throws IOException {
+ return (T) mapper.readValue(DataInputInputStream.constructInputStream(in), clazz);
+ }
+
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonDrillSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonDrillSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonDrillSerializable.java
index a7b0be2..dcfc1ec 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonDrillSerializable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonDrillSerializable.java
@@ -66,21 +66,4 @@ public abstract class JacksonDrillSerializable<T> implements DrillSerializable,
return obj;
}
- public static class StoragePluginsSerializable extends JacksonDrillSerializable<StoragePlugins> {
-
- public StoragePluginsSerializable(DrillbitContext context, StoragePlugins obj) {
- super(context, obj);
- }
-
- public StoragePluginsSerializable(BufferAllocator allocator) {
- }
-
- public StoragePluginsSerializable() {
- }
-
- @Override
- public void readFromStream(InputStream input) throws IOException {
- readFromStream(input, StoragePlugins.class);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonSerializable.java
new file mode 100644
index 0000000..831db84
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonSerializable.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.cache;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+
+public abstract class JacksonSerializable implements DrillSerializable{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JacksonSerializable.class);
+
+ private void fail(){
+ throw new UnsupportedOperationException("Need to register serialization config for class " + this.getClass().getName()); // rely on external serializer
+
+ }
+
+ @Override
+ public void readData(ObjectDataInput input) throws IOException {
+ fail();
+ }
+
+ @Override
+ public void readFromStream(InputStream input) throws IOException {
+ fail();
+ }
+
+ @Override
+ public void writeData(ObjectDataOutput output) throws IOException {
+ fail();
+ }
+
+ @Override
+ public void writeToStream(OutputStream output) throws IOException {
+ fail();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
index 119764b..0fb4b82 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
@@ -17,14 +17,15 @@
*/
package org.apache.drill.exec.cache;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.InvocationTargetException;
import java.util.Collection;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -38,6 +39,7 @@ import org.apache.drill.exec.memory.TopLevelAllocator;
import org.apache.drill.exec.proto.BitControl.PlanFragment;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -49,11 +51,14 @@ public class LocalCache implements DistributedCache {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LocalCache.class);
private volatile Map<FragmentHandle, PlanFragment> handles;
+ private volatile ConcurrentMap<String, DistributedMap<?>> namedMaps;
private volatile ConcurrentMap<Class<?>, DistributedMap<?>> maps;
private volatile ConcurrentMap<Class<?>, DistributedMultiMap<?>> multiMaps;
private volatile ConcurrentMap<String, Counter> counters;
private static final BufferAllocator allocator = new TopLevelAllocator();
+ private static final ObjectMapper mapper = DrillConfig.create().getMapper();
+
@Override
public void close() throws IOException {
handles = null;
@@ -65,6 +70,7 @@ public class LocalCache implements DistributedCache {
maps = Maps.newConcurrentMap();
multiMaps = Maps.newConcurrentMap();
counters = Maps.newConcurrentMap();
+ namedMaps = Maps.newConcurrentMap();
}
@Override
@@ -78,7 +84,7 @@ public class LocalCache implements DistributedCache {
// logger.debug("Storing fragment: {}", fragment);
handles.put(fragment.getHandle(), fragment);
}
-
+
@Override
public <V extends DrillSerializable> DistributedMultiMap<V> getMultiMap(Class<V> clazz) {
DistributedMultiMap<V> mmap = (DistributedMultiMap<V>) multiMaps.get(clazz);
@@ -101,6 +107,18 @@ public class LocalCache implements DistributedCache {
}
}
+
+ @Override
+ public <V extends DrillSerializable> DistributedMap<V> getNamedMap(String name, Class<V> clazz) {
+ DistributedMap m = namedMaps.get(clazz);
+ if (m == null) {
+ namedMaps.putIfAbsent(name, new LocalDistributedMapImpl<V>(clazz));
+ return (DistributedMap<V>) namedMaps.get(name);
+ } else {
+ return m;
+ }
+ }
+
@Override
public Counter getCounter(String name) {
Counter c = counters.get(name);
@@ -113,6 +131,16 @@ public class LocalCache implements DistributedCache {
}
public static ByteArrayDataOutput serialize(DrillSerializable obj) {
+ if(obj instanceof JacksonSerializable){
+ try{
+ ByteArrayDataOutput out = ByteStreams.newDataOutput();
+ out.write(mapper.writeValueAsBytes(obj));
+ return out;
+ }catch(Exception e){
+ throw new RuntimeException(e);
+ }
+ }
+
ByteArrayDataOutput out = ByteStreams.newDataOutput();
OutputStream outputStream = DataOutputOutputStream.constructOutputStream(out);
try {
@@ -129,6 +157,14 @@ public class LocalCache implements DistributedCache {
}
public static <V extends DrillSerializable> V deserialize(byte[] bytes, Class<V> clazz) {
+ if(JacksonSerializable.class.isAssignableFrom(clazz)){
+ try{
+ return (V) mapper.readValue(bytes, clazz);
+ }catch(Exception e){
+ throw new RuntimeException(e);
+ }
+ }
+
ByteArrayDataInput in = ByteStreams.newDataInput(bytes);
InputStream inputStream = DataInputInputStream.constructInputStream(in);
try {
@@ -165,8 +201,8 @@ public class LocalCache implements DistributedCache {
}
public static class LocalDistributedMapImpl<V extends DrillSerializable> implements DistributedMap<V> {
- private ConcurrentMap<String, ByteArrayDataOutput> m;
- private Class<V> clazz;
+ protected ConcurrentMap<String, ByteArrayDataOutput> m;
+ protected Class<V> clazz;
public LocalDistributedMapImpl(Class<V> clazz) {
m = Maps.newConcurrentMap();
@@ -196,6 +232,56 @@ public class LocalCache implements DistributedCache {
m.putIfAbsent(key, serialize(value));
logger.warn("Expiration not implemented in local map cache");
}
+
+ private class DeserializingTransformer implements Iterator<Map.Entry<String, V> >{
+ private Iterator<Map.Entry<String, ByteArrayDataOutput>> inner;
+
+ public DeserializingTransformer(Iterator<Entry<String, ByteArrayDataOutput>> inner) {
+ super();
+ this.inner = inner;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return inner.hasNext();
+ }
+
+ @Override
+ public Entry<String, V> next() {
+ return newEntry(inner.next());
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ public Entry<String, V> newEntry(final Entry<String, ByteArrayDataOutput> input) {
+ return new Map.Entry<String, V>(){
+
+ @Override
+ public String getKey() {
+ return input.getKey();
+ }
+
+ @Override
+ public V getValue() {
+ return deserialize(input.getValue().toByteArray(), clazz);
+ }
+
+ @Override
+ public V setValue(V value) {
+ throw new UnsupportedOperationException();
+ }
+
+ };
+ }
+
+ }
+ @Override
+ public Iterator<Entry<String, V>> iterator() {
+ return new DeserializingTransformer(m.entrySet().iterator());
+ }
}
public static class LocalCounterImpl implements Counter {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/cache/SerializationDefinition.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/SerializationDefinition.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/SerializationDefinition.java
new file mode 100644
index 0000000..95ba434
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/SerializationDefinition.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.cache;
+
+import org.apache.drill.exec.planner.logical.StoragePlugins;
+import org.apache.drill.exec.server.options.OptionValue;
+
+public enum SerializationDefinition {
+
+ OPTION(3002, OptionValue.class),
+ STORAGE_PLUGINS(3003, StoragePlugins.class)
+ ;
+ public final int id;
+ public final Class<?> clazz;
+
+ SerializationDefinition(int id, Class<?> clazz){
+ this.id = id;
+ this.clazz = clazz;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index f3bcfef..2a6ab0e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -27,6 +27,7 @@ import net.hydromatic.optiq.tools.Frameworks;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.compile.ClassTransformer;
import org.apache.drill.exec.compile.QueryClassLoader;
import org.apache.drill.exec.exception.ClassTransformationException;
@@ -42,6 +43,9 @@ import org.apache.drill.exec.rpc.control.ControlTunnel;
import org.apache.drill.exec.rpc.data.DataTunnel;
import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.FragmentOptionsManager;
+import org.apache.drill.exec.server.options.OptionList;
+import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.work.batch.IncomingBuffers;
import com.beust.jcommander.internal.Lists;
@@ -68,12 +72,13 @@ public class FragmentContext implements Closeable {
private IncomingBuffers buffers;
private final long queryStartTime;
private final int rootFragmentTimeZone;
+ private final OptionManager sessionOptions;
private volatile Throwable failureCause;
private volatile boolean failed = false;
public FragmentContext(DrillbitContext dbContext, PlanFragment fragment, UserClientConnection connection,
- FunctionImplementationRegistry funcRegistry) throws OutOfMemoryException {
+ FunctionImplementationRegistry funcRegistry) throws OutOfMemoryException, ExecutionSetupException {
this.loader = new QueryClassLoader(true);
this.transformer = new ClassTransformer();
this.stats = new FragmentStats(dbContext.getMetrics());
@@ -85,9 +90,24 @@ public class FragmentContext implements Closeable {
this.rootFragmentTimeZone = fragment.getTimeZone();
logger.debug("Getting initial memory allocation of {}", fragment.getMemInitial());
logger.debug("Fragment max allocation: {}", fragment.getMemMax());
+ try{
+ OptionList list;
+ if(!fragment.hasOptionsJson() || fragment.getOptionsJson().isEmpty()){
+ list = new OptionList();
+ }else{
+ list = dbContext.getConfig().getMapper().readValue(fragment.getOptionsJson(), OptionList.class);
+ }
+ this.sessionOptions = new FragmentOptionsManager(context.getOptionManager(), list);
+ }catch(Exception e){
+ throw new ExecutionSetupException("Failure while reading plan options.", e);
+ }
this.allocator = context.getAllocator().getChildAllocator(fragment.getHandle(), fragment.getMemInitial(), fragment.getMemMax());
}
+ public OptionManager getOptions(){
+ return sessionOptions;
+ }
+
public void setBuffers(IncomingBuffers buffers) {
this.buffers = buffers;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
index 7e3b63d..f6ce04f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
@@ -23,6 +23,7 @@ import net.hydromatic.optiq.SchemaPlus;
import net.hydromatic.optiq.tools.Frameworks;
import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.ExpressionParsingException;
import org.apache.drill.exec.cache.DistributedCache;
import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
import org.apache.drill.exec.planner.PhysicalPlanReader;
@@ -33,7 +34,10 @@ import org.apache.drill.exec.rpc.control.WorkEventBus;
import org.apache.drill.exec.rpc.data.DataConnectionCreator;
import org.apache.drill.exec.rpc.user.UserSession;
import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.server.options.OptionValue;
import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.eigenbase.sql.SqlLiteral;
public class QueryContext{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryContext.class);
@@ -52,7 +56,7 @@ public class QueryContext{
this.workBus = drllbitContext.getWorkBus();
this.session = session;
this.timer = new Multitimer<>(QuerySetup.class);
- this.plannerSettings = new PlannerSettings();
+ this.plannerSettings = new PlannerSettings(session.getOptions());
}
public PlannerSettings getPlannerSettings(){
@@ -79,6 +83,9 @@ public class QueryContext{
return rootSchema;
}
+ public OptionManager getOptions(){
+ return session.getOptions();
+ }
public DrillbitEndpoint getCurrentEndpoint(){
return drillbitContext.getEndpoint();
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java.orig
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java.orig b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java.orig
new file mode 100644
index 0000000..4f050a6
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java.orig
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.ops;
+
+import java.util.Collection;
+
+import net.hydromatic.optiq.SchemaPlus;
+import net.hydromatic.optiq.tools.Frameworks;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.ExpressionParsingException;
+import org.apache.drill.exec.cache.DistributedCache;
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.rpc.control.WorkEventBus;
+import org.apache.drill.exec.rpc.data.DataConnectionCreator;
+import org.apache.drill.exec.rpc.user.UserSession;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+
+public class QueryContext{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryContext.class);
+
+ private final QueryId queryId;
+ private final DrillbitContext drillbitContext;
+ private final WorkEventBus workBus;
+ private UserSession session;
+ public final Multitimer<QuerySetup> timer;
+ private final PlannerSettings plannerSettings;
+
+ public QueryContext(UserSession session, QueryId queryId, DrillbitContext drllbitContext) {
+ super();
+ this.queryId = queryId;
+ this.drillbitContext = drllbitContext;
+ this.workBus = drllbitContext.getWorkBus();
+ this.session = session;
+ this.timer = new Multitimer<>(QuerySetup.class);
+ this.plannerSettings = new PlannerSettings();
+ }
+
+ public PlannerSettings getPlannerSettings(){
+ return plannerSettings;
+ }
+
+ public UserSession getSession(){
+ return session;
+ }
+
+ public SchemaPlus getNewDefaultSchema(){
+ SchemaPlus rootSchema = getRootSchema();
+ SchemaPlus defaultSchema = session.getDefaultSchema(rootSchema);
+ if(defaultSchema == null){
+ return rootSchema;
+ }else{
+ return defaultSchema;
+ }
+ }
+
+ public SchemaPlus getRootSchema(){
+ SchemaPlus rootSchema = Frameworks.createRootSchema();
+ drillbitContext.getSchemaFactory().registerSchemas(session.getUser(), rootSchema);
+ return rootSchema;
+ }
+
+<<<<<<< HEAD
+=======
+ public Object getOptionValue(String name){
+ return drillbitContext.getGlobalDrillOptions().getOption(name);
+ }
+
+ public void setOptionValue(String name, String value) throws ExpressionParsingException {
+ drillbitContext.getGlobalDrillOptions().setOption(name, value);
+ }
+
+>>>>>>> Drill 381 - implementation of session and global options.
+ public DrillbitEndpoint getCurrentEndpoint(){
+ return drillbitContext.getEndpoint();
+ }
+
+ public QueryId getQueryId() {
+ return queryId;
+ }
+
+ public StoragePluginRegistry getStorage(){
+ return drillbitContext.getStorage();
+ }
+
+
+ public DistributedCache getCache(){
+ return drillbitContext.getCache();
+ }
+
+ public Collection<DrillbitEndpoint> getActiveEndpoints(){
+ return drillbitContext.getBits();
+ }
+
+ public PhysicalPlanReader getPlanReader(){
+ return drillbitContext.getPlanReader();
+ }
+
+ public DataConnectionCreator getDataConnectionsPool(){
+ return drillbitContext.getDataConnectionsPool();
+ }
+
+ public DrillConfig getConfig(){
+ return drillbitContext.getConfig();
+ }
+
+ public WorkEventBus getWorkBus(){
+ return workBus;
+ }
+
+ public FunctionImplementationRegistry getFunctionRegistry(){
+ return drillbitContext.getFunctionImplementationRegistry();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
index d7d5ccb..78c0e3c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.List;
+import org.apache.drill.common.JSONOptions;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.logical.LogicalPlan;
@@ -53,8 +54,8 @@ import org.apache.drill.exec.physical.config.Screen;
import org.apache.drill.exec.physical.config.SelectionVectorRemover;
import org.apache.drill.exec.physical.config.Sort;
import org.apache.drill.exec.physical.config.StreamingAggregate;
-import org.apache.drill.exec.physical.config.HashAggregate;
-import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.rpc.user.UserServer;
+import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.StoragePlugin;
import org.eigenbase.rel.RelFieldCollation.Direction;
import org.eigenbase.rel.RelFieldCollation.NullDirection;
@@ -62,13 +63,46 @@ import org.eigenbase.rel.RelFieldCollation.NullDirection;
import com.beust.jcommander.internal.Lists;
public class BasicOptimizer extends Optimizer{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicOptimizer.class);
private DrillConfig config;
private QueryContext context;
+ private UserServer.UserClientConnection userSession;
- public BasicOptimizer(DrillConfig config, QueryContext context){
+ public BasicOptimizer(DrillConfig config, QueryContext context, UserServer.UserClientConnection userSession){
this.config = config;
this.context = context;
+ this.userSession = userSession;
+ logCurrentOptionValues();
+ }
+
+ private void logCurrentOptionValues(){
+// Iterator<DrillOptionValue> optionVals = userSession.getSessionOptionIterator();
+// DrillOptionValue val = null;
+// String output = "";
+// output += "SessionOptions: {\n";
+// for ( ;optionVals.hasNext(); val = optionVals.next()){
+// if (val != null) {
+// output += val.getOptionName() + ":" + val.getValue() + ",\n";
+// }
+// }
+// output += "}";
+// logger.debug(output);
+ }
+
+ /**
+ * Get the current value of an option. Session options override global options.
+ *
+ * @param name - the name of the option
+ * @return - value of the option
+ */
+ private Object getOptionValue(String name) {
+// Object val = userSession.getSessionLevelOption(name);
+// if (val == null) {
+// context.getOptionValue(name);
+// }
+// return val;
+ return null;
}
@Override
@@ -90,7 +124,8 @@ public class BasicOptimizer extends Optimizer{
PlanProperties props = PlanProperties.builder()
.type(PlanProperties.PlanType.APACHE_DRILL_PHYSICAL)
.version(plan.getProperties().version)
- .generator(plan.getProperties().generator).build();
+ .generator(plan.getProperties().generator)
+ .options(new JSONOptions(context.getOptions().getSessionOptionList())).build();
PhysicalPlan p = new PhysicalPlan(props, physOps);
return p;
//return new PhysicalPlan(props, physOps);
@@ -103,10 +138,20 @@ public class BasicOptimizer extends Optimizer{
public static class BasicOptimizationContext implements OptimizationContext {
+ private OptionManager ops;
+ public BasicOptimizationContext(QueryContext c){
+ this.ops = c.getOptions();
+ }
+
@Override
public int getPriority() {
return 1;
}
+
+ @Override
+ public OptionManager getOptions() {
+ return ops;
+ }
}
private class LogicalConverter extends AbstractLogicalVisitor<PhysicalOperator, Object, OptimizerException> {
@@ -121,10 +166,10 @@ public class BasicOptimizer extends Optimizer{
@Override
public PhysicalOperator visitGroupingAggregate(GroupingAggregate groupBy, Object value) throws OptimizerException {
-
+
List<Ordering> orderDefs = Lists.newArrayList();
-
+
PhysicalOperator input = groupBy.getInput().accept(this, value);
if(groupBy.getKeys().length > 0){
@@ -133,7 +178,7 @@ public class BasicOptimizer extends Optimizer{
}
input = new Sort(input, orderDefs, false);
}
-
+
StreamingAggregate sa = new StreamingAggregate(input, groupBy.getKeys(), groupBy.getExprs(), 1.0f);
return sa;
}
@@ -165,7 +210,7 @@ public class BasicOptimizer extends Optimizer{
}
leftOp = new Sort(leftOp, leftOrderDefs, false);
leftOp = new SelectionVectorRemover(leftOp);
-
+
PhysicalOperator rightOp = join.getRight().accept(this, value);
List<Ordering> rightOrderDefs = Lists.newArrayList();
for(JoinCondition jc : join.getConditions()){
@@ -173,7 +218,7 @@ public class BasicOptimizer extends Optimizer{
}
rightOp = new Sort(rightOp, rightOrderDefs, false);
rightOp = new SelectionVectorRemover(rightOp);
-
+
MergeJoinPOP mjp = new MergeJoinPOP(leftOp, rightOp, Arrays.asList(join.getConditions()), join.getJoinType());
return new SelectionVectorRemover(mjp);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/opt/Optimizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/Optimizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/Optimizer.java
index c7069c3..34d0622 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/Optimizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/Optimizer.java
@@ -24,23 +24,25 @@ import org.apache.drill.common.exceptions.DrillConfigurationException;
import org.apache.drill.common.logical.LogicalPlan;
import org.apache.drill.exec.exception.OptimizerException;
import org.apache.drill.exec.physical.PhysicalPlan;
+import org.apache.drill.exec.server.options.OptionManager;
public abstract class Optimizer implements Closeable{
-
+
public static String OPTIMIZER_IMPL_KEY = "drill.exec.optimizer.implementation";
-
+
public abstract void init(DrillConfig config);
-
+
public abstract PhysicalPlan optimize(OptimizationContext context, LogicalPlan plan) throws OptimizerException;
public abstract void close();
-
+
public static Optimizer getOptimizer(DrillConfig config) throws DrillConfigurationException{
Optimizer o = config.getInstanceOf(OPTIMIZER_IMPL_KEY, Optimizer.class);
o.init(config);
return o;
}
-
+
public interface OptimizationContext{
public int getPriority();
+ public OptionManager getOptions();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java
index 167a992..8d77136 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java
@@ -30,14 +30,14 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.PhysicalOperatorUtil;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.record.MajorTypeSerDe;
+import org.apache.drill.exec.server.options.OptionList;
+import org.apache.drill.exec.store.StoragePluginRegistry;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.module.SimpleModule;
-import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.store.StoragePluginRegistry;
public class PhysicalPlanReader {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PhysicalPlanReader.class);
@@ -56,8 +56,8 @@ public class PhysicalPlanReader {
.addDeserializer(DrillbitEndpoint.class, new DrillbitEndpointSerDe.De()) //
.addSerializer(MajorType.class, new MajorTypeSerDe.Se())
.addDeserializer(MajorType.class, new MajorTypeSerDe.De());
-
-
+
+
mapper.registerModule(deserModule);
mapper.registerSubtypes(PhysicalOperatorUtil.getSubTypes(config));
InjectableValues injectables = new InjectableValues.Std() //
@@ -78,10 +78,14 @@ public class PhysicalPlanReader {
this(config, mapper, endpoint, null);
}
+ public String writeJson(OptionList list) throws JsonProcessingException{
+ return mapper.writeValueAsString(list);
+ }
+
public String writeJson(PhysicalOperator op) throws JsonProcessingException{
return mapper.writeValueAsString(op);
}
-
+
public PhysicalPlan readPhysicalPlan(String json) throws JsonProcessingException, IOException {
logger.debug("Reading physical plan {}", json);
return physicalPlanReader.readValue(json);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimpleExecPlanner.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimpleExecPlanner.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimpleExecPlanner.java
index dff2dd3..4da6500 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimpleExecPlanner.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimpleExecPlanner.java
@@ -27,6 +27,7 @@ import org.apache.drill.exec.planner.fragment.MakeFragmentsVisitor;
import org.apache.drill.exec.planner.fragment.PlanningSet;
import org.apache.drill.exec.planner.fragment.SimpleParallelizer;
import org.apache.drill.exec.planner.fragment.StatsCollector;
+import org.apache.drill.exec.server.options.OptionList;
import org.apache.drill.exec.work.QueryWorkUnit;
/**
@@ -34,25 +35,25 @@ import org.apache.drill.exec.work.QueryWorkUnit;
*/
public class SimpleExecPlanner implements ExecPlanner{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SimpleExecPlanner.class);
-
+
private MakeFragmentsVisitor fragmenter = new MakeFragmentsVisitor();
private SimpleParallelizer parallelizer = new SimpleParallelizer();
@Override
public QueryWorkUnit getWorkUnit(QueryContext context, PhysicalPlan plan, int maxWidth) throws ExecutionSetupException {
-
+
// get the root physical operator and split the plan into sub fragments.
PhysicalOperator root = plan.getSortedOperators(false).iterator().next();
Fragment fragmentRoot = root.accept(fragmenter, null);
-
+
// generate a planning set and collect stats.
PlanningSet planningSet = StatsCollector.collectStats(fragmentRoot);
int maxWidthPerEndpoint = context.getConfig().getInt(ExecConstants.MAX_WIDTH_PER_ENDPOINT);
-
- return parallelizer.getFragments(context.getCurrentEndpoint(), context.getQueryId(), context.getActiveEndpoints(),
+
+ return parallelizer.getFragments(new OptionList(), context.getCurrentEndpoint(), context.getQueryId(), context.getActiveEndpoints(),
context.getPlanReader(), fragmentRoot, planningSet, maxWidth, maxWidthPerEndpoint);
-
-
+
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
index 6e951df..c34869d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
@@ -23,6 +23,7 @@ import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.PhysicalOperatorSetupException;
import org.apache.drill.exec.exception.FragmentSetupException;
+import org.apache.drill.exec.expr.fn.impl.DateUtility;
import org.apache.drill.exec.physical.base.FragmentRoot;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.planner.PhysicalPlanReader;
@@ -31,6 +32,7 @@ import org.apache.drill.exec.proto.BitControl.PlanFragment;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.server.options.OptionList;
import org.apache.drill.exec.work.QueryWorkUnit;
import com.fasterxml.jackson.core.JsonProcessingException;
@@ -67,13 +69,13 @@ public class SimpleParallelizer {
* @return The list of generated PlanFragment protobuf objects to be assigned out to the individual nodes.
* @throws ExecutionSetupException
*/
- public QueryWorkUnit getFragments(DrillbitEndpoint foremanNode, QueryId queryId, Collection<DrillbitEndpoint> activeEndpoints, PhysicalPlanReader reader, Fragment rootNode, PlanningSet planningSet,
+ public QueryWorkUnit getFragments(OptionList options, DrillbitEndpoint foremanNode, QueryId queryId, Collection<DrillbitEndpoint> activeEndpoints, PhysicalPlanReader reader, Fragment rootNode, PlanningSet planningSet,
int globalMaxWidth, int maxWidthPerEndpoint) throws ExecutionSetupException {
assignEndpoints(activeEndpoints, planningSet, globalMaxWidth, maxWidthPerEndpoint);
- return generateWorkUnit(foremanNode, queryId, reader, rootNode, planningSet);
+ return generateWorkUnit(options, foremanNode, queryId, reader, rootNode, planningSet);
}
- private QueryWorkUnit generateWorkUnit(DrillbitEndpoint foremanNode, QueryId queryId, PhysicalPlanReader reader, Fragment rootNode,
+ private QueryWorkUnit generateWorkUnit(OptionList options, DrillbitEndpoint foremanNode, QueryId queryId, PhysicalPlanReader reader, Fragment rootNode,
PlanningSet planningSet) throws ExecutionSetupException {
List<PlanFragment> fragments = Lists.newArrayList();
@@ -109,8 +111,10 @@ public class SimpleParallelizer {
// get plan as JSON
String plan;
+ String optionsData;
try {
plan = reader.writeJson(root);
+ optionsData = reader.writeJson(options);
} catch (JsonProcessingException e) {
throw new FragmentSetupException("Failure while trying to convert fragment into json.", e);
}
@@ -135,6 +139,7 @@ public class SimpleParallelizer {
.setTimeZone(timeZone)//
.setMemInitial(wrapper.getInitialAllocation())//
.setMemMax(wrapper.getMaxAllocation())
+ .setOptionsJson(optionsData)
.build();
if (isRootNode) {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/StoragePlugins.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/StoragePlugins.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/StoragePlugins.java
index 939b77c..d296eb1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/StoragePlugins.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/StoragePlugins.java
@@ -26,21 +26,22 @@ import java.util.Map.Entry;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.cache.JacksonSerializable;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Charsets;
import com.google.common.io.Resources;
-public class StoragePlugins implements Iterable<Map.Entry<String, StoragePluginConfig>>{
-
+public class StoragePlugins extends JacksonSerializable implements Iterable<Map.Entry<String, StoragePluginConfig>>{
+
private Map<String, StoragePluginConfig> storage;
-
+
@JsonCreator
public StoragePlugins(@JsonProperty("storage") Map<String, StoragePluginConfig> storage){
this.storage = storage;
}
-
+
public static void main(String[] args) throws Exception{
DrillConfig config = DrillConfig.create();
String data = Resources.toString(Resources.getResource("storage-engines.json"), Charsets.UTF_8);
@@ -88,6 +89,6 @@ public class StoragePlugins implements Iterable<Map.Entry<String, StoragePluginC
}
return storage.equals(((StoragePlugins) obj).getStorage());
}
-
-
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
index e6e99c0..5e49451 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
@@ -19,17 +19,24 @@ package org.apache.drill.exec.planner.physical;
import net.hydromatic.optiq.tools.FrameworkContext;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.server.options.OptionValidator;
+import org.apache.drill.exec.server.options.TypeValidators.BooleanValidator;
+
public class PlannerSettings implements FrameworkContext{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PlannerSettings.class);
- private boolean singleMode;
- public boolean isSingleMode() {
- return singleMode;
+ public static final OptionValidator EXCHANGE = new BooleanValidator("planner.disable_exchanges", false);
+
+ public OptionManager options;
+
+ public PlannerSettings(OptionManager options){
+ this.options = options;
}
- public void setSingleMode(boolean singleMode) {
- this.singleMode = singleMode;
+ public boolean isSingleMode() {
+ return options.getOption(EXCHANGE.getOptionName()).bool_val;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
index 0e9f798..116017c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
@@ -25,6 +25,7 @@ import net.hydromatic.optiq.tools.Planner;
import net.hydromatic.optiq.tools.RelConversionException;
import net.hydromatic.optiq.tools.ValidationException;
+import org.apache.drill.common.JSONOptions;
import org.apache.drill.common.logical.PlanProperties;
import org.apache.drill.common.logical.PlanProperties.Generator.ResultMode;
import org.apache.drill.common.logical.PlanProperties.PlanPropertiesBuilder;
@@ -34,12 +35,10 @@ import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.planner.logical.DrillRel;
-import org.apache.drill.exec.planner.logical.DrillRuleSets;
import org.apache.drill.exec.planner.logical.DrillScreenRel;
import org.apache.drill.exec.planner.logical.DrillStoreRel;
import org.apache.drill.exec.planner.physical.DrillDistributionTrait;
import org.apache.drill.exec.planner.physical.PhysicalPlanCreator;
-import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.planner.physical.Prel;
import org.apache.drill.exec.planner.sql.DrillSqlWorker;
import org.eigenbase.rel.RelNode;
@@ -132,6 +131,7 @@ public class DefaultSqlHandler implements SqlHandler{
PlanPropertiesBuilder propsBuilder = PlanProperties.builder();
propsBuilder.type(PlanType.APACHE_DRILL_PHYSICAL);
propsBuilder.version(1);
+ propsBuilder.options(new JSONOptions(context.getOptions().getSessionOptionList()));
propsBuilder.resultMode(ResultMode.EXEC);
propsBuilder.generator(this.getClass().getSimpleName(), "");
return new PhysicalPlan(propsBuilder.build(), getPops(op));
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java
index dec9222..86ce6c5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java
@@ -59,7 +59,7 @@ public class ExplainHandler extends DefaultSqlHandler{
log("Drill Logical", drel);
if(mode == ResultMode.LOGICAL){
- LogicalExplain logicalResult = new LogicalExplain(drel);
+ LogicalExplain logicalResult = new LogicalExplain(drel, level, context);
return DirectPlan.createDirectPlan(context, logicalResult);
}
@@ -68,7 +68,7 @@ public class ExplainHandler extends DefaultSqlHandler{
PhysicalOperator pop = convertToPop(prel);
PhysicalPlan plan = convertToPlan(pop);
log("Drill Plan", plan);
- PhysicalExplain physicalResult = new PhysicalExplain(prel, plan);
+ PhysicalExplain physicalResult = new PhysicalExplain(prel, plan, level, context);
return DirectPlan.createDirectPlan(context, physicalResult);
}
@@ -93,11 +93,11 @@ public class ExplainHandler extends DefaultSqlHandler{
}
- public class LogicalExplain{
+ public static class LogicalExplain{
public String text;
public String json;
- public LogicalExplain(RelNode node){
+ public LogicalExplain(RelNode node, SqlExplainLevel level, QueryContext context){
this.text = RelOptUtil.toString(node, level);
DrillImplementor implementor = new DrillImplementor(new DrillParseContext(), ResultMode.LOGICAL);
implementor.go( (DrillRel) node);
@@ -106,11 +106,11 @@ public class ExplainHandler extends DefaultSqlHandler{
}
}
- public class PhysicalExplain{
+ public static class PhysicalExplain{
public String text;
public String json;
- public PhysicalExplain(RelNode node, PhysicalPlan plan){
+ public PhysicalExplain(RelNode node, PhysicalPlan plan, SqlExplainLevel level, QueryContext context){
this.text = RelOptUtil.toString(node, level);
this.json = plan.unparse(context.getConfig().getMapper().writer());
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SetOptionHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SetOptionHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SetOptionHandler.java
index 64be891..ae3960a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SetOptionHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SetOptionHandler.java
@@ -25,6 +25,7 @@ import net.hydromatic.optiq.tools.ValidationException;
import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.planner.sql.DirectPlan;
+import org.eigenbase.sql.SqlLiteral;
import org.eigenbase.sql.SqlNode;
import org.eigenbase.sql.SqlSetOption;
@@ -46,10 +47,23 @@ public class SetOptionHandler implements SqlHandler{
String scope = option.getScope();
String name = option.getName();
SqlNode value = option.getValue();
- if(name.equals("NO_EXCHANGES")){
- context.getPlannerSettings().setSingleMode(true);
+ if(value instanceof SqlLiteral){
+ switch(scope.toLowerCase()){
+ case "session":
+ context.getOptions().setOption(option.getName(), (SqlLiteral) value);
+ break;
+ case "system":
+ context.getOptions().getSystemManager().setOption(name, (SqlLiteral) value);
+ break;
+ default:
+ throw new ValidationException("Invalid OPTION scope. Scope must be SESSION or SYSTEM.");
+ }
+
+ }else{
+ throw new ValidationException("Sql options can only be literals.");
}
- return DirectPlan.createDirectPlan(context, true, "disabled exchanges.");
+
+ return DirectPlan.createDirectPlan(context, true, String.format("%s updated.", name));
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SimpleCommandResult.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SimpleCommandResult.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SimpleCommandResult.java
index 6dd7e7b..07d26b4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SimpleCommandResult.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SimpleCommandResult.java
@@ -18,7 +18,6 @@
package org.apache.drill.exec.planner.sql.handlers;
public class SimpleCommandResult {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SimpleCommandResult.class);
public boolean ok;
public String summary;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
index acd8412..3ee25d9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
@@ -17,13 +17,13 @@
*/
package org.apache.drill.exec.rpc.user;
-import java.io.IOException;
-
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
+import java.io.IOException;
+
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
@@ -100,12 +100,13 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
public class UserClientConnection extends RemoteConnection {
private UserSession session;
+
public UserClientConnection(Channel channel) {
super(channel);
}
void setUser(UserCredentials credentials, UserProperties props) throws IOException{
- session = new UserSession(credentials, props);
+ session = new UserSession(worker.getSystemOptions(), credentials, props);
}
public UserSession getSession(){
@@ -114,7 +115,6 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
public void sendResult(RpcOutcomeListener<Ack> listener, QueryWritableBatch result){
// logger.debug("Sending result to client with {}", result);
-
send(listener, this, RpcType.QUERY_RESULT, result.getHeader(), Ack.class, result.getBuffers());
}
@@ -122,6 +122,8 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
public BufferAllocator getAllocator() {
return alloc;
}
+
+
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
index f27317c..86c4bad 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
@@ -18,14 +18,18 @@
package org.apache.drill.exec.rpc.user;
import java.io.IOException;
+import java.util.Iterator;
import java.util.Map;
-import com.google.common.collect.Maps;
import net.hydromatic.optiq.SchemaPlus;
import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
import org.apache.drill.exec.proto.UserProtos.Property;
import org.apache.drill.exec.proto.UserProtos.UserProperties;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.server.options.SessionOptionManager;
+
+import com.google.common.collect.Maps;
public class UserSession {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserSession.class);
@@ -36,10 +40,11 @@ public class UserSession {
private boolean enableExchanges = true;
private UserCredentials credentials;
private Map<String, String> properties;
+ private OptionManager options;
- public UserSession(UserCredentials credentials, UserProperties properties) throws IOException{
+ public UserSession(OptionManager systemOptions, UserCredentials credentials, UserProperties properties) throws IOException{
this.credentials = credentials;
-
+ this.options = new SessionOptionManager(systemOptions);
this.properties = Maps.newHashMap();
if (properties == null) return;
for (int i=0; i<properties.getPropertiesCount(); i++) {
@@ -48,6 +53,10 @@ public class UserSession {
}
}
+ public OptionManager getOptions(){
+ return options;
+ }
+
public DrillUser getUser(){
return user;
}
@@ -95,4 +104,5 @@ public class UserSession {
}
return schema;
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
index 411a76c..7f607a3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
@@ -22,6 +22,7 @@ import java.io.Closeable;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.cache.DistributedCache;
+import org.apache.drill.exec.cache.DistributedMultiMap;
import org.apache.drill.exec.cache.HazelCache;
import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.coord.ClusterCoordinator.RegistrationHandle;
@@ -96,6 +97,7 @@ public class Drillbit implements Closeable{
manager.start(md, cache, engine.getController(), engine.getDataConnectionCreator(), coord);
cache.run();
manager.getContext().getStorage().init();
+ manager.getContext().getOptionManager().init();
handle = coord.register(md);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java.orig
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java.orig b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java.orig
new file mode 100644
index 0000000..5e72717
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java.orig
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server;
+
+import java.io.Closeable;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.cache.DistributedCache;
+import org.apache.drill.exec.cache.DistributedMultiMap;
+import org.apache.drill.exec.cache.HazelCache;
+import org.apache.drill.exec.coord.ClusterCoordinator;
+import org.apache.drill.exec.coord.ClusterCoordinator.RegistrationHandle;
+import org.apache.drill.exec.coord.ZKClusterCoordinator;
+import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.service.ServiceEngine;
+import org.apache.drill.exec.work.WorkManager;
+
+import com.google.common.io.Closeables;
+
+/**
+ * Starts, tracks and stops all the required services for a Drillbit daemon to work.
+ */
+public class Drillbit implements Closeable{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Drillbit.class);
+
+ public static Drillbit start(StartupOptions options) throws DrillbitStartupException {
+ return start(DrillConfig.create(options.getConfigLocation()));
+ }
+
+ public static Drillbit start(DrillConfig config) throws DrillbitStartupException {
+ Drillbit bit;
+ try {
+ logger.debug("Setting up Drillbit.");
+ bit = new Drillbit(config, null);
+ } catch (Exception ex) {
+ throw new DrillbitStartupException("Failure while initializing values in Drillbit.", ex);
+ }
+ try {
+ logger.debug("Starting Drillbit.");
+ bit.run();
+ } catch (Exception e) {
+ throw new DrillbitStartupException("Failure during initial startup of Drillbit.", e);
+ }
+ return bit;
+ }
+
+ public static void main(String[] cli) throws DrillbitStartupException {
+ StartupOptions options = StartupOptions.parse(cli);
+ start(options);
+ }
+
+ final ClusterCoordinator coord;
+ final ServiceEngine engine;
+ final DistributedCache cache;
+ final WorkManager manager;
+ final BootStrapContext context;
+
+ private volatile RegistrationHandle handle;
+
+ public Drillbit(DrillConfig config, RemoteServiceSet serviceSet) throws Exception {
+ if(serviceSet != null){
+ this.context = new BootStrapContext(config);
+ this.manager = new WorkManager(context);
+ this.coord = serviceSet.getCoordinator();
+ this.engine = new ServiceEngine(manager.getControlMessageHandler(), manager.getUserWorker(), context, manager.getWorkBus(), manager.getDataHandler());
+ this.cache = serviceSet.getCache();
+ }else{
+ Runtime.getRuntime().addShutdownHook(new ShutdownThread(config));
+ this.context = new BootStrapContext(config);
+ this.manager = new WorkManager(context);
+ this.coord = new ZKClusterCoordinator(config);
+ this.engine = new ServiceEngine(manager.getControlMessageHandler(), manager.getUserWorker(), context, manager.getWorkBus(), manager.getDataHandler());
+ this.cache = new HazelCache(config, context.getAllocator());
+ }
+ }
+
+ public void run() throws Exception {
+ coord.start(10000);
+ DrillbitEndpoint md = engine.start();
+ cache.run();
+<<<<<<< HEAD
+ manager.getContext().getStorage().init();
+=======
+ manager.start(md, cache, engine.getController(), engine.getDataConnectionCreator(), coord);
+>>>>>>> Drill 381 - implementation of session and global options.
+ handle = coord.register(md);
+ }
+
+ public void close() {
+ if (coord != null) coord.unregister(handle);
+
+ try {
+ Thread.sleep(context.getConfig().getInt(ExecConstants.ZK_REFRESH) * 2);
+ } catch (InterruptedException e) {
+ logger.warn("Interrupted while sleeping during coordination deregistration.");
+ }
+
+ Closeables.closeQuietly(engine);
+ Closeables.closeQuietly(coord);
+ Closeables.closeQuietly(manager);
+ Closeables.closeQuietly(context);
+ logger.info("Shutdown completed.");
+ }
+
+ private class ShutdownThread extends Thread {
+ ShutdownThread(DrillConfig config) {
+ this.setName("ShutdownHook");
+ }
+
+ @Override
+ public void run() {
+ logger.info("Received shutdown request.");
+ close();
+ }
+
+ }
+ public ClusterCoordinator getCoordinator(){
+ return coord;
+ }
+
+ public DrillbitContext getContext(){
+ return this.manager.getContext();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
index 1fd30e3..f0a47f8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
@@ -22,7 +22,6 @@ import io.netty.channel.nio.NioEventLoopGroup;
import java.util.Collection;
import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.cache.DistributedCache;
import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
@@ -33,9 +32,10 @@ import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.rpc.control.Controller;
import org.apache.drill.exec.rpc.control.WorkEventBus;
import org.apache.drill.exec.rpc.data.DataConnectionCreator;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.server.options.SystemOptionManager;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.store.StoragePluginRegistry.DrillSchemaFactory;
-import org.apache.drill.exec.store.StoragePlugin;
import com.codahale.metrics.MetricRegistry;
import com.google.common.base.Preconditions;
@@ -55,6 +55,7 @@ public class DrillbitContext {
private final Controller controller;
private final WorkEventBus workBus;
private final FunctionImplementationRegistry functionRegistry;
+ private final SystemOptionManager systemOptions;
public DrillbitContext(DrillbitEndpoint endpoint, BootStrapContext context, ClusterCoordinator coord, Controller controller, DataConnectionCreator connectionsPool, DistributedCache cache, WorkEventBus workBus) {
super();
@@ -73,6 +74,9 @@ public class DrillbitContext {
this.reader = new PhysicalPlanReader(context.getConfig(), context.getConfig().getMapper(), endpoint, storagePlugins);
this.operatorCreatorRegistry = new OperatorCreatorRegistry(context.getConfig());
this.functionRegistry = new FunctionImplementationRegistry(context.getConfig());
+ this.systemOptions = new SystemOptionManager(cache);
+
+// this.globalDrillOptions = new DistributedGlobalOptions(this.cache);
}
public FunctionImplementationRegistry getFunctionImplementationRegistry() {
@@ -83,6 +87,10 @@ public class DrillbitContext {
return workBus;
}
+ public SystemOptionManager getOptionManager() {
+ return systemOptions;
+ }
+
public DrillbitEndpoint getEndpoint(){
return endpoint;
}
[3/6] git commit: DRILL-645: Correct string encodings in code template
Posted by ja...@apache.org.
DRILL-645: Correct string encodings in code template
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/9b34f23d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/9b34f23d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/9b34f23d
Branch: refs/heads/master
Commit: 9b34f23dd8291c7c4c88e0421d02d56e65ae0cd2
Parents: 0f65fd8
Author: Aditya Kishore <ad...@maprtech.com>
Authored: Tue May 6 02:42:59 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Tue May 6 15:40:38 2014 -0700
----------------------------------------------------------------------
.../codegen/templates/CastFunctionsSrcVarLen.java | 16 ++++++++--------
.../src/main/codegen/templates/CastVarCharDate.java | 2 +-
.../main/codegen/templates/CastVarCharInterval.java | 2 +-
.../DateToCharFunctions.java | 2 +-
.../ToDateTypeFunctions.java | 4 ++--
.../templates/Decimal/CastVarCharDecimal.java | 10 +++++-----
.../codegen/templates/ObjectInspectorHelper.java | 4 +++-
.../src/main/codegen/templates/ValueHolders.java | 10 +++++++++-
.../codegen/templates/VariableLengthVectors.java | 12 ++++++++++--
.../drill/exec/physical/impl/TestHiveUDFs.java | 14 +++++++-------
10 files changed, 47 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9b34f23d/exec/java-exec/src/main/codegen/templates/CastFunctionsSrcVarLen.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/CastFunctionsSrcVarLen.java b/exec/java-exec/src/main/codegen/templates/CastFunctionsSrcVarLen.java
index 5855381..aa216cd 100644
--- a/exec/java-exec/src/main/codegen/templates/CastFunctionsSrcVarLen.java
+++ b/exec/java-exec/src/main/codegen/templates/CastFunctionsSrcVarLen.java
@@ -21,7 +21,7 @@
{
byte[] buf = new byte[in.end - in.start];
in.buffer.getBytes(in.start, buf, 0, in.end - in.start);
- throw new NumberFormatException(new String(buf));
+ throw new NumberFormatException(new String(buf, com.google.common.base.Charsets.UTF_8));
}
</#macro>
@@ -58,7 +58,7 @@ public class Cast${type.from}${type.to} implements DrillSimpleFunc{
in.buffer.getBytes(in.start, buf, 0, in.end - in.start);
//TODO: need capture format exception, and issue SQLERR code.
- out.value = ${type.javaType}.parse${type.parse}(new String(buf));
+ out.value = ${type.javaType}.parse${type.parse}(new String(buf, com.google.common.base.Charsets.UTF_8));
<#elseif type.to=="Int" || type.to == "BigInt">
@@ -66,7 +66,7 @@ public class Cast${type.from}${type.to} implements DrillSimpleFunc{
//empty, not a valid number
byte[] buf = new byte[in.end - in.start];
in.buffer.getBytes(in.start, buf, 0, in.end - in.start);
- throw new NumberFormatException(new String(buf));
+ throw new NumberFormatException(new String(buf, com.google.common.base.Charsets.UTF_8));
}
int readIndex = in.start;
@@ -77,7 +77,7 @@ public class Cast${type.from}${type.to} implements DrillSimpleFunc{
//only one single '-'
byte[] buf = new byte[in.end - in.start];
in.buffer.getBytes(in.start, buf, 0, in.end - in.start);
- throw new NumberFormatException(new String(buf));
+ throw new NumberFormatException(new String(buf, com.google.common.base.Charsets.UTF_8));
}
int radix = 10;
@@ -91,13 +91,13 @@ public class Cast${type.from}${type.to} implements DrillSimpleFunc{
if (digit == -1) {
byte[] buf = new byte[in.end - in.start];
in.buffer.getBytes(in.start, buf, 0, in.end - in.start);
- throw new NumberFormatException(new String(buf));
+ throw new NumberFormatException(new String(buf, com.google.common.base.Charsets.UTF_8));
}
//overflow
if (max > result) {
byte[] buf = new byte[in.end - in.start];
in.buffer.getBytes(in.start, buf, 0, in.end - in.start);
- throw new NumberFormatException(new String(buf));
+ throw new NumberFormatException(new String(buf, com.google.common.base.Charsets.UTF_8));
}
${type.primeType} next = result * radix - digit;
@@ -106,7 +106,7 @@ public class Cast${type.from}${type.to} implements DrillSimpleFunc{
if (next > result) {
byte[] buf = new byte[in.end - in.start];
in.buffer.getBytes(in.start, buf, 0, in.end - in.start);
- throw new NumberFormatException(new String(buf));
+ throw new NumberFormatException(new String(buf, com.google.common.base.Charsets.UTF_8));
}
result = next;
}
@@ -116,7 +116,7 @@ public class Cast${type.from}${type.to} implements DrillSimpleFunc{
if (result < 0) {
byte[] buf = new byte[in.end - in.start];
in.buffer.getBytes(in.start, buf, 0, in.end - in.start);
- throw new NumberFormatException(new String(buf));
+ throw new NumberFormatException(new String(buf, com.google.common.base.Charsets.UTF_8));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9b34f23d/exec/java-exec/src/main/codegen/templates/CastVarCharDate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/CastVarCharDate.java b/exec/java-exec/src/main/codegen/templates/CastVarCharDate.java
index 39b8b41..249b555 100644
--- a/exec/java-exec/src/main/codegen/templates/CastVarCharDate.java
+++ b/exec/java-exec/src/main/codegen/templates/CastVarCharDate.java
@@ -54,7 +54,7 @@ public class Cast${type.from}To${type.to} implements DrillSimpleFunc {
byte[] buf = new byte[in.end - in.start];
in.buffer.getBytes(in.start, buf, 0, in.end - in.start);
- String input = new String(buf);
+ String input = new String(buf, com.google.common.base.Charsets.UTF_8);
<#if type.to == "Date">
org.joda.time.format.DateTimeFormatter f = org.apache.drill.exec.expr.fn.impl.DateUtility.getDateTimeFormatter();
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9b34f23d/exec/java-exec/src/main/codegen/templates/CastVarCharInterval.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/CastVarCharInterval.java b/exec/java-exec/src/main/codegen/templates/CastVarCharInterval.java
index ebb1bd9..135abdd 100644
--- a/exec/java-exec/src/main/codegen/templates/CastVarCharInterval.java
+++ b/exec/java-exec/src/main/codegen/templates/CastVarCharInterval.java
@@ -55,7 +55,7 @@ public class Cast${type.from}To${type.to} implements DrillSimpleFunc {
byte[] buf = new byte[in.end - in.start];
in.buffer.getBytes(in.start, buf, 0, in.end - in.start);
- String input = new String(buf);
+ String input = new String(buf, com.google.common.base.Charsets.UTF_8);
// Parse the ISO format
org.joda.time.Period period = org.joda.time.Period.parse(input);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9b34f23d/exec/java-exec/src/main/codegen/templates/DateIntervalFunctionTemplates/DateToCharFunctions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/DateIntervalFunctionTemplates/DateToCharFunctions.java b/exec/java-exec/src/main/codegen/templates/DateIntervalFunctionTemplates/DateToCharFunctions.java
index cd5466c..326b1df 100644
--- a/exec/java-exec/src/main/codegen/templates/DateIntervalFunctionTemplates/DateToCharFunctions.java
+++ b/exec/java-exec/src/main/codegen/templates/DateIntervalFunctionTemplates/DateToCharFunctions.java
@@ -57,7 +57,7 @@ public class G${type}ToChar implements DrillSimpleFunc {
// Get the desired output format and create a DateTimeFormatter
byte[] buf = new byte[right.end - right.start];
right.buffer.getBytes(right.start, buf, 0, right.end - right.start);
- String input = new String(buf);
+ String input = new String(buf, com.google.common.base.Charsets.UTF_8);
format = org.joda.time.format.DateTimeFormat.forPattern(input);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9b34f23d/exec/java-exec/src/main/codegen/templates/DateIntervalFunctionTemplates/ToDateTypeFunctions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/DateIntervalFunctionTemplates/ToDateTypeFunctions.java b/exec/java-exec/src/main/codegen/templates/DateIntervalFunctionTemplates/ToDateTypeFunctions.java
index 30dfc89..fffaef4 100644
--- a/exec/java-exec/src/main/codegen/templates/DateIntervalFunctionTemplates/ToDateTypeFunctions.java
+++ b/exec/java-exec/src/main/codegen/templates/DateIntervalFunctionTemplates/ToDateTypeFunctions.java
@@ -50,7 +50,7 @@ public class GTo${type} implements DrillSimpleFunc {
// Get the desired output format
byte[] buf = new byte[right.end - right.start];
right.buffer.getBytes(right.start, buf, 0, right.end - right.start);
- String formatString = new String(buf);
+ String formatString = new String(buf, com.google.common.base.Charsets.UTF_8);
format = org.joda.time.format.DateTimeFormat.forPattern(formatString);
}
@@ -59,7 +59,7 @@ public class GTo${type} implements DrillSimpleFunc {
// Get the input
byte[] buf1 = new byte[left.end - left.start];
left.buffer.getBytes(left.start, buf1, 0, left.end - left.start);
- String input = new String(buf1);
+ String input = new String(buf1, com.google.common.base.Charsets.UTF_8);
<#if type == "Date">
out.value = (org.joda.time.DateMidnight.parse(input, format).withZoneRetainFields(org.joda.time.DateTimeZone.UTC)).getMillis();
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9b34f23d/exec/java-exec/src/main/codegen/templates/Decimal/CastVarCharDecimal.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/Decimal/CastVarCharDecimal.java b/exec/java-exec/src/main/codegen/templates/Decimal/CastVarCharDecimal.java
index b0214f4..e3eb973 100644
--- a/exec/java-exec/src/main/codegen/templates/Decimal/CastVarCharDecimal.java
+++ b/exec/java-exec/src/main/codegen/templates/Decimal/CastVarCharDecimal.java
@@ -107,7 +107,7 @@ public class Cast${type.from}${type.to} implements DrillSimpleFunc {
// not a valid digit
byte[] buf = new byte[in.end - in.start];
in.buffer.getBytes(in.start, buf, 0, in.end - in.start);
- throw new org.apache.drill.common.exceptions.DrillRuntimeException(new String(buf));
+ throw new org.apache.drill.common.exceptions.DrillRuntimeException(new String(buf, com.google.common.base.Charsets.UTF_8));
}
out.value *= radix;
out.value += next;
@@ -117,7 +117,7 @@ public class Cast${type.from}${type.to} implements DrillSimpleFunc {
if (((integerEndIndex - integerStartIndex) + out.scale) > out.precision) {
byte[] buf = new byte[in.end - in.start];
in.buffer.getBytes(in.start, buf, 0, in.end - in.start);
- throw new org.apache.drill.common.exceptions.DrillRuntimeException("Precision is insufficient for the provided input: " + new String(buf) + " Precision: " + out.precision +
+ throw new org.apache.drill.common.exceptions.DrillRuntimeException("Precision is insufficient for the provided input: " + new String(buf, com.google.common.base.Charsets.UTF_8) + " Precision: " + out.precision +
" Total Digits: " + (out.scale + (integerEndIndex - integerStartIndex)));
}
@@ -240,7 +240,7 @@ public class Cast${type.from}${type.to} implements DrillSimpleFunc {
// not a valid digit
byte[] buf = new byte[in.end - in.start];
in.buffer.getBytes(in.start, buf, 0, in.end - in.start);
- throw new NumberFormatException(new String(buf));
+ throw new NumberFormatException(new String(buf, com.google.common.base.Charsets.UTF_8));
}
integerDigits++;
@@ -253,7 +253,7 @@ public class Cast${type.from}${type.to} implements DrillSimpleFunc {
if (integerDigits + out.scale > out.precision) {
byte[] buf = new byte[in.end - in.start];
in.buffer.getBytes(in.start, buf, 0, in.end - in.start);
- throw new org.apache.drill.common.exceptions.DrillRuntimeException("Precision is insufficient for the provided input: " + new String(buf) + " Precision: " + out.precision + " Total Digits: " + (out.scale + integerDigits));
+ throw new org.apache.drill.common.exceptions.DrillRuntimeException("Precision is insufficient for the provided input: " + new String(buf, com.google.common.base.Charsets.UTF_8) + " Precision: " + out.precision + " Total Digits: " + (out.scale + integerDigits));
}
@@ -313,7 +313,7 @@ public class Cast${type.from}${type.to} implements DrillSimpleFunc {
// not a valid digit
byte[] buf = new byte[in.end - in.start];
in.buffer.getBytes(in.start, buf, 0, in.end - in.start);
- throw new NumberFormatException(new String(buf));
+ throw new NumberFormatException(new String(buf, com.google.common.base.Charsets.UTF_8));
}
int value = (out.getInteger(decimalBufferIndex) * radix) + next;
out.setInteger(decimalBufferIndex, value);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9b34f23d/exec/java-exec/src/main/codegen/templates/ObjectInspectorHelper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/ObjectInspectorHelper.java b/exec/java-exec/src/main/codegen/templates/ObjectInspectorHelper.java
index 61078c7..3c72a96 100644
--- a/exec/java-exec/src/main/codegen/templates/ObjectInspectorHelper.java
+++ b/exec/java-exec/src/main/codegen/templates/ObjectInspectorHelper.java
@@ -23,8 +23,10 @@
package org.apache.drill.exec.expr.fn.impl.hive;
import com.sun.codemodel.*;
+
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.expr.DirectExpression;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.expr.holders.*;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -153,7 +155,7 @@ public class ObjectInspectorHelper {
<#elseif entry.hiveType == "STRING">
JVar data = jc._else().decl(m.directClass(byte[].class.getCanonicalName()), "data",
castedOI.invoke("getPrimitiveJavaObject").arg(returnValue)
- .invoke("getBytes"));
+ .invoke("getBytes").arg(DirectExpression.direct("com.google.common.base.Charsets.UTF_16")));
jc._else().add(returnValueHolder.ref("buffer")
.invoke("setBytes").arg(JExpr.lit(0)).arg(data));
jc._else().assign(returnValueHolder.ref("start"), JExpr.lit(0));
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9b34f23d/exec/java-exec/src/main/codegen/templates/ValueHolders.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/ValueHolders.java b/exec/java-exec/src/main/codegen/templates/ValueHolders.java
index e36e655..7272f4f 100644
--- a/exec/java-exec/src/main/codegen/templates/ValueHolders.java
+++ b/exec/java-exec/src/main/codegen/templates/ValueHolders.java
@@ -136,7 +136,15 @@ public final class ${className} implements ValueHolder{
</#if>
byte[] buf = new byte[end-start];
buffer.getBytes(start, buf, 0, end-start);
- return new String(buf);
+
+ <#switch minor.class>
+ <#case "Var16Char">
+ return new String(buf, com.google.common.base.Charsets.UTF_16);
+ <#break>
+ <#case "VarChar">
+ <#default>
+ return new String(buf, com.google.common.base.Charsets.UTF_8);
+ </#switch>
}
</#if>
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9b34f23d/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
index 3905bce..6b05ec5 100644
--- a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
@@ -463,11 +463,19 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
@Override
public void generateTestData(int size){
boolean even = true;
+ <#switch minor.class>
+ <#case "Var16Char">
+ java.nio.charset.Charset charset = Charsets.UTF_16;
+ <#break>
+ <#case "VarChar">
+ <#default>
+ java.nio.charset.Charset charset = Charsets.UTF_8;
+ </#switch>
for(int i =0; i < size; i++, even = !even){
if(even){
- set(i, new String("aaaaa").getBytes(Charsets.UTF_8));
+ set(i, new String("aaaaa").getBytes(charset));
}else{
- set(i, new String("bbbbbbbbbb").getBytes(Charsets.UTF_8));
+ set(i, new String("bbbbbbbbbb").getBytes(charset));
}
}
setValueCount(size);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9b34f23d/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestHiveUDFs.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestHiveUDFs.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestHiveUDFs.java
index dff69e1..eabeda2 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestHiveUDFs.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestHiveUDFs.java
@@ -109,28 +109,28 @@ public class TestHiveUDFs extends ExecTest {
for(int i=0; i<exec.getRecordCount(); i++) {
- String in = new String(str1V.getAccessor().get(i));
- String upper = new String(upperStr1V.getAccessor().get(i));
+ String in = new String(str1V.getAccessor().get(i), Charsets.UTF_16);
+ String upper = new String(upperStr1V.getAccessor().get(i), Charsets.UTF_16);
assertTrue(in.toUpperCase().equals(upper));
long unix_timestamp = unix_timestampV.getAccessor().get(i);
- String concat = new String(concatV.getAccessor().get(i));
+ String concat = new String(concatV.getAccessor().get(i), Charsets.UTF_16);
assertTrue(concat.equals(in+"-"+in));
float flt1 = flt1V.getAccessor().get(i);
- String format_number = new String(format_numberV.getAccessor().get(i));
+ String format_number = new String(format_numberV.getAccessor().get(i), Charsets.UTF_16);
String nullableStr1 = null;
/* DRILL-425
if (!nullableStr1V.getAccessor().isNull(i))
- nullableStr1 = new String(nullableStr1V.getAccessor().get(i));*/
+ nullableStr1 = new String(nullableStr1V.getAccessor().get(i), Charsets.UTF_16);*/
String upperNullableStr1 = null;
/* DRILL-425
if (!upperNullableStr1V.getAccessor().isNull(i))
- upperNullableStr1 = new String(upperNullableStr1V.getAccessor().get(i)); */
+ upperNullableStr1 = new String(upperNullableStr1V.getAccessor().get(i), Charsets.UTF_16); */
assertEquals(nullableStr1 != null, upperNullableStr1 != null);
if (nullableStr1 != null)
@@ -184,7 +184,7 @@ public class TestHiveUDFs extends ExecTest {
for(int i=0; i<exec.getRecordCount(); i++) {
- String str1 = new String(str1V.getAccessor().get(i));
+ String str1 = new String(str1V.getAccessor().get(i), Charsets.UTF_16);
int str1Length = str1LengthV.getAccessor().get(i);
assertTrue(str1.length() == str1Length);
[5/6] DRILL-381: Implement SYSTEM and SESSION options.
Posted by ja...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java.orig
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java.orig b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java.orig
new file mode 100644
index 0000000..cdfcf60
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java.orig
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server;
+
+import io.netty.channel.nio.NioEventLoopGroup;
+
+import java.util.Collection;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.cache.DistributedCache;
+import org.apache.drill.exec.cache.DistributedMultiMap;
+import org.apache.drill.exec.coord.ClusterCoordinator;
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
+import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.rpc.control.Controller;
+import org.apache.drill.exec.rpc.control.WorkEventBus;
+import org.apache.drill.exec.rpc.data.DataConnectionCreator;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.StoragePluginRegistry.DrillSchemaFactory;
+import org.apache.drill.exec.store.StoragePlugin;
+
+import com.codahale.metrics.MetricRegistry;
+import com.google.common.base.Preconditions;
+
+public class DrillbitContext {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillbitContext.class);
+
+ private final BootStrapContext context;
+
+ private PhysicalPlanReader reader;
+ private final ClusterCoordinator coord;
+ private final DataConnectionCreator connectionsPool;
+ private final DistributedCache cache;
+ private final DrillbitEndpoint endpoint;
+ private final StoragePluginRegistry storagePlugins;
+ private final OperatorCreatorRegistry operatorCreatorRegistry;
+ private final Controller controller;
+ private final WorkEventBus workBus;
+ private final FunctionImplementationRegistry functionRegistry;
+<<<<<<< HEAD
+
+=======
+ private final DistributedGlobalOptions globalDrillOptions;
+
+>>>>>>> Drill 381 - implementation of session and global options.
+ public DrillbitContext(DrillbitEndpoint endpoint, BootStrapContext context, ClusterCoordinator coord, Controller controller, DataConnectionCreator connectionsPool, DistributedCache cache, WorkEventBus workBus) {
+ super();
+ Preconditions.checkNotNull(endpoint);
+ Preconditions.checkNotNull(context);
+ Preconditions.checkNotNull(controller);
+ Preconditions.checkNotNull(connectionsPool);
+ this.workBus = workBus;
+ this.controller = controller;
+ this.context = context;
+ this.coord = coord;
+ this.connectionsPool = connectionsPool;
+ this.cache = cache;
+ this.endpoint = endpoint;
+ this.storagePlugins = new StoragePluginRegistry(this);
+ this.reader = new PhysicalPlanReader(context.getConfig(), context.getConfig().getMapper(), endpoint, storagePlugins);
+ this.operatorCreatorRegistry = new OperatorCreatorRegistry(context.getConfig());
+ this.functionRegistry = new FunctionImplementationRegistry(context.getConfig());
+ this.globalDrillOptions = new DistributedGlobalOptions(this.cache);
+ }
+
+<<<<<<< HEAD
+=======
+
+>>>>>>> Drill 381 - implementation of session and global options.
+ public FunctionImplementationRegistry getFunctionImplementationRegistry() {
+ return functionRegistry;
+ }
+
+ public WorkEventBus getWorkBus(){
+ return workBus;
+ }
+<<<<<<< HEAD
+=======
+
+ public DistributedGlobalOptions getGlobalDrillOptions() {
+ return globalDrillOptions;
+ }
+>>>>>>> Drill 381 - implementation of session and global options.
+
+ public DrillbitEndpoint getEndpoint(){
+ return endpoint;
+ }
+
+ public DrillConfig getConfig() {
+ return context.getConfig();
+ }
+
+ public Collection<DrillbitEndpoint> getBits(){
+ return coord.getAvailableEndpoints();
+ }
+
+ public BufferAllocator getAllocator(){
+ return context.getAllocator();
+ }
+
+ public OperatorCreatorRegistry getOperatorCreatorRegistry() {
+ return operatorCreatorRegistry;
+ }
+
+ public StoragePluginRegistry getStorage(){
+ return this.storagePlugins;
+ }
+
+ public NioEventLoopGroup getBitLoopGroup(){
+ return context.getBitLoopGroup();
+ }
+
+
+ public DataConnectionCreator getDataConnectionsPool(){
+ return connectionsPool;
+ }
+
+ public Controller getController(){
+ return controller;
+ }
+
+ public MetricRegistry getMetrics(){
+ return context.getMetrics();
+ }
+
+ public DistributedCache getCache(){
+ return cache;
+ }
+
+ public PhysicalPlanReader getPlanReader(){
+ return reader;
+ }
+
+ public DrillSchemaFactory getSchemaFactory(){
+ return storagePlugins.getSchemaFactory();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/DrillConfigIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/DrillConfigIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/DrillConfigIterator.java
new file mode 100644
index 0000000..e4b03d3
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/DrillConfigIterator.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.options;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.server.options.OptionValue.Kind;
+import org.apache.drill.exec.server.options.OptionValue.OptionType;
+
+import com.typesafe.config.ConfigValue;
+
+public class DrillConfigIterator implements Iterable<OptionValue> {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillConfigIterator.class);
+
+ DrillConfig c;
+ public DrillConfigIterator(DrillConfig c){
+ this.c = c;
+ }
+
+ @Override
+ public Iterator<OptionValue> iterator() {
+ return new Iter(c);
+ }
+
+ public class Iter implements Iterator<OptionValue>{
+
+ Iterator<Entry<String, ConfigValue>> entries;
+ public Iter(DrillConfig c){
+ entries = c.entrySet().iterator();
+ }
+ @Override
+ public boolean hasNext() {
+ return entries.hasNext();
+ }
+
+ @Override
+ public OptionValue next() {
+ Entry<String, ConfigValue> e = entries.next();
+ OptionValue v = new OptionValue();
+ v.name = e.getKey();
+ ConfigValue cv = e.getValue();
+ v.type = OptionType.BOOT;
+ switch(cv.valueType()){
+ case BOOLEAN:
+ v.kind = Kind.BOOLEAN;
+ v.bool_val = (Boolean) cv.unwrapped();
+ break;
+ case LIST:
+ case OBJECT:
+ case STRING:
+ v.string_val = cv.render();
+ break;
+ case NUMBER:
+ v.kind = Kind.LONG;
+ v.num_val = ((Number)cv.unwrapped()).longValue();
+ break;
+ }
+ return v;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FragmentOptionsManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FragmentOptionsManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FragmentOptionsManager.java
new file mode 100644
index 0000000..e9620db
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FragmentOptionsManager.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.options;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.eigenbase.sql.SqlLiteral;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+
+public class FragmentOptionsManager implements OptionManager{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentOptionsManager.class);
+
+ ImmutableMap<String, OptionValue> options;
+ OptionManager systemOptions;
+
+ public FragmentOptionsManager(OptionManager systemOptions, OptionList options){
+ Map<String, OptionValue> tmp = Maps.newHashMap();
+ for(OptionValue v : options){
+ tmp.put(v.name, v);
+ }
+ this.options = ImmutableMap.copyOf(tmp);
+ this.systemOptions = systemOptions;
+ }
+
+ @Override
+ public Iterator<OptionValue> iterator() {
+ return Iterables.concat(systemOptions, options.values()).iterator();
+ }
+
+ @Override
+ public OptionValue getOption(String name) {
+ return null;
+ }
+
+ @Override
+ public void setOption(OptionValue value) throws SetOptionException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setOption(String name, SqlLiteral literal) throws SetOptionException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public OptionAdmin getAdmin() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public OptionManager getSystemManager() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public OptionList getSessionOptionList() {
+ throw new UnsupportedOperationException();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionList.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionList.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionList.java
new file mode 100644
index 0000000..b6d4f8c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionList.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.options;
+
+import java.util.ArrayList;
+
+public class OptionList extends ArrayList<OptionValue>{
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionManager.java
new file mode 100644
index 0000000..3833833
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionManager.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.options;
+
+import org.eigenbase.sql.SqlLiteral;
+
+public interface OptionManager extends Iterable<OptionValue>{
+ public OptionValue getOption(String name);
+ public void setOption(OptionValue value) throws SetOptionException;
+ public void setOption(String name, SqlLiteral literal) throws SetOptionException;
+ public OptionAdmin getAdmin();
+ public OptionManager getSystemManager();
+ public OptionList getSessionOptionList();
+
+ public interface OptionAdmin{
+ public void registerOptionType(OptionValidator validator);
+ public void validate(OptionValue v) throws SetOptionException;
+ public OptionValue validate(String name, SqlLiteral value) throws SetOptionException;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValidator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValidator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValidator.java
new file mode 100644
index 0000000..5b90ba5
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValidator.java
@@ -0,0 +1,65 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.server.options;
+
+
+import org.apache.drill.common.exceptions.ExpressionParsingException;
+import org.eigenbase.sql.SqlLiteral;
+
+/**
+ * Validates the values provided to Drill options.
+ *
+ * @param <E>
+ */
+public abstract class OptionValidator {
+
+ // Stored here as well as in the option static class to allow insertion of option optionName into
+ // the error messages produced by the validator
+ private String optionName;
+
+ public OptionValidator(String optionName){
+ this.optionName = optionName;
+ }
+
+ /**
+ * This method determines if a given value is a valid setting for an option. For options that support some
+ * ambiguity in their settings, such as case-insensitivity for string options, this method returns a modified
+ * version of the passed value that is considered the standard format of the option that should be used for
+ * system-internal representation.
+ *
+ * @param value - the value to validate
+ * @return - the value requested, in its standard format to be used for representing the value within Drill
+ * Example: all lower case values for strings, to avoid ambiguities in how values are stored
+ * while allowing some flexibility for users
+ * @throws ExpressionParsingException - message to describe error with value, including range or list of expected values
+ */
+ public abstract OptionValue validate(SqlLiteral value) throws ExpressionParsingException;
+
+ public String getOptionName() {
+ return optionName;
+ }
+
+ public String getDefaultString(){
+ return null;
+ }
+
+
+ public abstract OptionValue getDefault();
+
+ public abstract void validate(OptionValue v) throws ExpressionParsingException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java
new file mode 100644
index 0000000..7b4f7f6
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.options;
+
+import org.apache.drill.exec.cache.JacksonSerializable;
+
+import com.google.common.base.Preconditions;
+
+
+public class OptionValue extends JacksonSerializable {
+
+ public static enum OptionType {
+ BOOT, SYSTEM, SESSION
+ }
+
+ public static enum Kind {
+ BOOLEAN, LONG, STRING, DOUBLE
+ }
+
+ public String name;
+ public Kind kind;
+ public OptionType type;
+ public Long num_val;
+ public String string_val;
+ public Boolean bool_val;
+ public Double float_val;
+
+ public static OptionValue createLong(OptionType type, String name, long val) {
+ return new OptionValue(Kind.LONG, type, name, val, null, null, null);
+ }
+
+ public static OptionValue createBoolean(OptionType type, String name, boolean bool) {
+ return new OptionValue(Kind.BOOLEAN, type, name, null, null, bool, null);
+ }
+
+ public static OptionValue createString(OptionType type, String name, String val) {
+ return new OptionValue(Kind.STRING, type, name, null, val, null, null);
+ }
+
+ public static OptionValue createDouble(OptionType type, String name, double val) {
+ return new OptionValue(Kind.DOUBLE, type, name, null, null, null, val);
+ }
+
+ public OptionValue(){}
+
+ private OptionValue(Kind kind, OptionType type, String name, Long num_val, String string_val, Boolean bool_val, Double float_val) {
+ super();
+ Preconditions.checkArgument(num_val != null || string_val != null || bool_val != null);
+ this.name = name;
+ this.kind = kind;
+ this.float_val = float_val;
+ this.type = type;
+ this.num_val = num_val;
+ this.string_val = string_val;
+ this.bool_val = bool_val;
+ this.type = type;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((bool_val == null) ? 0 : bool_val.hashCode());
+ result = prime * result + ((float_val == null) ? 0 : float_val.hashCode());
+ result = prime * result + ((kind == null) ? 0 : kind.hashCode());
+ result = prime * result + ((name == null) ? 0 : name.hashCode());
+ result = prime * result + ((num_val == null) ? 0 : num_val.hashCode());
+ result = prime * result + ((string_val == null) ? 0 : string_val.hashCode());
+ result = prime * result + ((type == null) ? 0 : type.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ OptionValue other = (OptionValue) obj;
+ if (bool_val == null) {
+ if (other.bool_val != null)
+ return false;
+ } else if (!bool_val.equals(other.bool_val))
+ return false;
+ if (float_val == null) {
+ if (other.float_val != null)
+ return false;
+ } else if (!float_val.equals(other.float_val))
+ return false;
+ if (kind != other.kind)
+ return false;
+ if (name == null) {
+ if (other.name != null)
+ return false;
+ } else if (!name.equals(other.name))
+ return false;
+ if (num_val == null) {
+ if (other.num_val != null)
+ return false;
+ } else if (!num_val.equals(other.num_val))
+ return false;
+ if (string_val == null) {
+ if (other.string_val != null)
+ return false;
+ } else if (!string_val.equals(other.string_val))
+ return false;
+ if (type != other.type)
+ return false;
+ return true;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java
new file mode 100644
index 0000000..993cead
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.options;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.drill.exec.server.options.OptionValue.OptionType;
+import org.eigenbase.sql.SqlLiteral;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+
+public class SessionOptionManager implements OptionManager{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SessionOptionManager.class);
+
+ private Map<String, OptionValue> options = Maps.newConcurrentMap();
+ private OptionManager systemOptions;
+
+ public SessionOptionManager(OptionManager systemOptions) {
+ super();
+ this.systemOptions = systemOptions;
+ }
+
+ @Override
+ public Iterator<OptionValue> iterator() {
+ return Iterables.concat(systemOptions, options.values()).iterator();
+ }
+
+ @Override
+ public OptionValue getOption(String name) {
+ OptionValue opt = options.get(name);
+ if(opt == null){
+ return systemOptions.getOption(name);
+ }else{
+ return opt;
+ }
+ }
+
+ @Override
+ public void setOption(OptionValue value) {
+ systemOptions.getAdmin().validate(value);
+ setValidatedOption(value);
+ }
+
+
+ @Override
+ public OptionList getSessionOptionList() {
+ OptionList list = new OptionList();
+ for(OptionValue o : options.values()){
+ list.add(o);
+ }
+ return list;
+ }
+
+ private void setValidatedOption(OptionValue value){
+ if(value.type == OptionType.SYSTEM){
+ systemOptions.setOption(value);
+ }else{
+ options.put(value.name, value);
+ }
+ }
+
+ @Override
+ public void setOption(String name, SqlLiteral literal) {
+ OptionValue val = systemOptions.getAdmin().validate(name, literal);
+ val.type = OptionValue.OptionType.SESSION;
+ setValidatedOption(val);
+ }
+
+ @Override
+ public OptionAdmin getAdmin() {
+ return systemOptions.getAdmin();
+ }
+
+ @Override
+ public OptionManager getSystemManager() {
+ return systemOptions;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SetOptionException.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SetOptionException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SetOptionException.java
new file mode 100644
index 0000000..dd698c3
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SetOptionException.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.options;
+
+import java.util.Set;
+
+import javax.validation.ConstraintViolation;
+
+import org.apache.drill.common.exceptions.LogicalPlanParsingException;
+import org.apache.drill.common.logical.data.LogicalOperator;
+import org.apache.drill.common.logical.data.LogicalOperatorBase;
+
+public class SetOptionException extends LogicalPlanParsingException{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SetOptionException.class);
+
+ public SetOptionException() {
+ super();
+
+ }
+
+ public SetOptionException(LogicalOperator operator, Set<ConstraintViolation<LogicalOperatorBase>> violations) {
+ super(operator, violations);
+
+ }
+
+ public SetOptionException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+
+ }
+
+ public SetOptionException(String message, Throwable cause) {
+ super(message, cause);
+
+ }
+
+ public SetOptionException(String message) {
+ super(message);
+
+ }
+
+ public SetOptionException(Throwable cause) {
+ super(cause);
+
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
new file mode 100644
index 0000000..98975e4
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.options;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.drill.exec.cache.DistributedCache;
+import org.apache.drill.exec.cache.DistributedMap;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.exec.server.options.OptionValue.OptionType;
+import org.eigenbase.sql.SqlLiteral;
+
+import com.google.common.collect.Maps;
+
+public class SystemOptionManager implements OptionManager{
+
+ private final OptionValidator[] VALIDATORS = {
+ PlannerSettings.EXCHANGE
+ };
+
+ private DistributedMap<OptionValue> options;
+ private SystemOptionAdmin admin;
+ private final ConcurrentMap<String, OptionValidator> knownOptions = Maps.newConcurrentMap();
+ private DistributedCache cache;
+
+ public SystemOptionManager(DistributedCache cache){
+ this.cache = cache;
+ }
+
+ public void init(){
+ this.options = cache.getNamedMap("system.options", OptionValue.class);
+ this.admin = new SystemOptionAdmin();
+ }
+
+ private class Iter implements Iterator<OptionValue>{
+ private Iterator<Map.Entry<String, OptionValue>> inner;
+
+ public Iter(Iterator<Map.Entry<String, OptionValue>> inner){
+ this.inner = inner;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return inner.hasNext();
+ }
+
+ @Override
+ public OptionValue next() {
+ return inner.next().getValue();
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ }
+ @Override
+ public Iterator<OptionValue> iterator() {
+ return new Iter(options.iterator());
+ }
+
+ @Override
+ public OptionValue getOption(String name) {
+ return options.get(name);
+ }
+
+ @Override
+ public void setOption(OptionValue value) {
+ admin.validate(value);
+ assert value.type == OptionType.SYSTEM;
+ options.put(value.name, value);
+ }
+
+ @Override
+ public void setOption(String name, SqlLiteral literal) {
+ OptionValue v = admin.validate(name, literal);
+ v.type = OptionValue.OptionType.SYSTEM;
+ options.put(name, v);
+ }
+
+ @Override
+ public OptionList getSessionOptionList() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public OptionManager getSystemManager() {
+ return this;
+ }
+
+ @Override
+ public OptionAdmin getAdmin() {
+ return admin;
+ }
+
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SystemOptionManager.class);
+
+
+ private class SystemOptionAdmin implements OptionAdmin{
+
+ public SystemOptionAdmin(){
+ for(OptionValidator v : VALIDATORS){
+ knownOptions.put(v.getOptionName(), v);
+ options.putIfAbsent(v.getOptionName(), v.getDefault());
+ }
+ }
+
+
+ @Override
+ public void registerOptionType(OptionValidator validator) {
+ if(null != knownOptions.putIfAbsent(validator.getOptionName(), validator) ){
+ throw new IllegalArgumentException("Only one option is allowed to be registered with name: " + validator.getOptionName());
+ }
+ }
+
+ @Override
+ public void validate(OptionValue v) throws SetOptionException {
+ OptionValidator validator = knownOptions.get(v.name);
+ if(validator == null) throw new SetOptionException("Unknown option " + v.name);
+ validator.validate(v);
+ }
+
+ @Override
+ public OptionValue validate(String name, SqlLiteral value) throws SetOptionException {
+ OptionValidator validator = knownOptions.get(name);
+ if(validator == null) throw new SetOptionException("Unknown option " + name);
+ return validator.validate(value);
+ }
+
+
+
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java
new file mode 100644
index 0000000..0d681ea
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.options;
+
+import java.math.BigDecimal;
+
+import org.apache.drill.common.exceptions.ExpressionParsingException;
+import org.apache.drill.exec.server.options.OptionValue.Kind;
+import org.apache.drill.exec.server.options.OptionValue.OptionType;
+import org.eigenbase.sql.SqlLiteral;
+
+public class TypeValidators {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TypeValidators.class);
+
+ public static class PositiveLongValidator extends LongValidator {
+
+ private final long max;
+
+ public PositiveLongValidator(String name, long max, long def) {
+ super(name, def);
+ this.max = max;
+ }
+
+ @Override
+ public void extraValidate(OptionValue v) throws ExpressionParsingException {
+ if (v.num_val > max || v.num_val < 0)
+ throw new ExpressionParsingException(String.format("Option %s must be between %d and %d.", getOptionName(), 0,
+ max));
+ }
+ }
+
+ public static class BooleanValidator extends TypeValidator{
+ public BooleanValidator(String name, boolean def){
+ super(name, Kind.BOOLEAN, OptionValue.createBoolean(OptionType.SYSTEM, name, def));
+ }
+ }
+ public static class StringValidator extends TypeValidator{
+ public StringValidator(String name, String def){
+ super(name, Kind.LONG, OptionValue.createString(OptionType.SYSTEM, name, def));
+ }
+
+ }
+ public static class LongValidator extends TypeValidator{
+ public LongValidator(String name, long def){
+ super(name, Kind.LONG, OptionValue.createLong(OptionType.SYSTEM, name, def));
+ }
+ }
+ public static class DoubleValidator extends TypeValidator{
+
+ public DoubleValidator(String name, double def){
+ super(name, Kind.DOUBLE, OptionValue.createDouble(OptionType.SYSTEM, name, def));
+ }
+
+
+ }
+
+ public static abstract class TypeValidator extends OptionValidator {
+ final Kind kind;
+ private OptionValue defaultValue;
+
+ public TypeValidator(String name, Kind kind, OptionValue defValue) {
+ super(name);
+ this.kind = kind;
+ this.defaultValue = defValue;
+ }
+
+ @Override
+ public OptionValue getDefault() {
+ return defaultValue;
+ }
+
+ @Override
+ public OptionValue validate(SqlLiteral value) throws ExpressionParsingException {
+ OptionValue op = getPartialValue(getOptionName(), (OptionType) null, value);
+ validate(op);
+ return op;
+ }
+
+ @Override
+ public final void validate(OptionValue v) throws ExpressionParsingException {
+ if (v.kind != kind)
+ throw new ExpressionParsingException(String.format("Option %s must be of type %s but you tried to set to %s.",
+ getOptionName(), kind.name(), v.kind.name()));
+ }
+
+ public void extraValidate(OptionValue v) throws ExpressionParsingException {
+ }
+
+ }
+
+ public static OptionValue getPartialValue(String name, OptionType type, SqlLiteral literal) {
+ switch (literal.getTypeName()) {
+ case DECIMAL:
+ case DOUBLE:
+ case FLOAT:
+ return OptionValue.createDouble(type, name, ((BigDecimal) literal.getValue()).doubleValue());
+
+ case SMALLINT:
+ case TINYINT:
+ case BIGINT:
+ case INTEGER:
+ return OptionValue.createLong(type, name, ((BigDecimal) literal.getValue()).longValue());
+
+ case VARBINARY:
+ case VARCHAR:
+ case CHAR:
+ return OptionValue.createString(type, name, (String) literal.getValue());
+
+ case BOOLEAN:
+ return OptionValue.createBoolean(type, name, (Boolean) literal.getValue());
+
+ }
+
+ throw new ExpressionParsingException(String.format(
+ "Drill doesn't support set option expressions with literals of type %s.", literal.getTypeName()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
index 4d88686..948c74f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
@@ -28,7 +28,6 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import com.google.common.base.Preconditions;
import net.hydromatic.linq4j.expressions.DefaultExpression;
import net.hydromatic.linq4j.expressions.Expression;
import net.hydromatic.optiq.SchemaPlus;
@@ -40,10 +39,9 @@ import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.common.util.PathScanner;
import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.planner.logical.DrillRuleSets;
import org.apache.drill.exec.cache.DistributedMap;
-import org.apache.drill.exec.cache.JacksonDrillSerializable.StoragePluginsSerializable;
import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.planner.logical.DrillRuleSets;
import org.apache.drill.exec.planner.logical.StoragePlugins;
import org.apache.drill.exec.rpc.user.DrillUser;
import org.apache.drill.exec.server.DrillbitContext;
@@ -51,9 +49,12 @@ import org.apache.drill.exec.store.dfs.FileSystemPlugin;
import org.apache.drill.exec.store.dfs.FormatPlugin;
import org.apache.drill.exec.store.ischema.InfoSchemaConfig;
import org.apache.drill.exec.store.ischema.InfoSchemaStoragePlugin;
+import org.apache.drill.exec.store.sys.SystemTablePlugin;
+import org.apache.drill.exec.store.sys.SystemTablePluginConfig;
import org.eigenbase.relopt.RelOptRule;
import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSet.Builder;
@@ -91,9 +92,9 @@ public class StoragePluginRegistry implements Iterable<Map.Entry<String, Storage
int i =0;
for(Constructor<?> c : plugin.getConstructors()){
Class<?>[] params = c.getParameterTypes();
- if(params.length != 3
- || params[1] != DrillbitContext.class
- || !StoragePluginConfig.class.isAssignableFrom(params[0])
+ if(params.length != 3
+ || params[1] != DrillbitContext.class
+ || !StoragePluginConfig.class.isAssignableFrom(params[0])
|| params[2] != String.class){
logger.info("Skipping StoragePlugin constructor {} for plugin class {} since it doesn't implement a [constructor(StoragePluginConfig, DrillbitContext, String)]", c, plugin);
continue;
@@ -109,7 +110,7 @@ public class StoragePluginRegistry implements Iterable<Map.Entry<String, Storage
// create registered plugins defined in "storage-plugins.json"
this.plugins = ImmutableMap.copyOf(createPlugins());
- // query registered engines for optimizer rules and build the storage plugin RuleSet
+ // query registered engines for optimizer rules and build the storage plugin RuleSet
Builder<RelOptRule> setBuilder = ImmutableSet.builder();
for (StoragePlugin plugin : this.plugins.values()) {
Set<StoragePluginOptimizerRule> rules = plugin.getOptimizerRules();
@@ -135,17 +136,15 @@ public class StoragePluginRegistry implements Iterable<Map.Entry<String, Storage
String pluginsData = Resources.toString(url, Charsets.UTF_8);
plugins = context.getConfig().getMapper().readValue(pluginsData, StoragePlugins.class);
}
- DistributedMap<StoragePluginsSerializable> map = context.getCache().getMap(StoragePluginsSerializable.class);
- StoragePluginsSerializable cachedPluginsSerializable = map.get("storage-plugins");
- if (cachedPluginsSerializable != null) {
- cachedPlugins = cachedPluginsSerializable.getObj();
+ DistributedMap<StoragePlugins> map = context.getCache().getMap(StoragePlugins.class);
+ cachedPlugins = map.get("storage-plugins");
+ if (cachedPlugins != null) {
logger.debug("Found cached storage plugin config: {}", cachedPlugins);
} else {
Preconditions.checkNotNull(plugins,"No storage plugin configuration found");
logger.debug("caching storage plugin config {}", plugins);
- map.put("storage-plugins", new StoragePluginsSerializable(context, plugins));
- cachedPluginsSerializable = map.get("storage-plugins");
- cachedPlugins = cachedPluginsSerializable.getObj();
+ map.put("storage-plugins", plugins);
+ cachedPlugins = map.get("storage-plugins");
}
if(!(plugins == null || cachedPlugins.equals(plugins))) {
logger.error("Storage plugin config mismatch. {}. {}", plugins, cachedPlugins);
@@ -155,7 +154,7 @@ public class StoragePluginRegistry implements Iterable<Map.Entry<String, Storage
}catch(IOException e){
throw new IllegalStateException("Failure while reading storage plugins data.", e);
}
-
+
for(Map.Entry<String, StoragePluginConfig> config : cachedPlugins){
try{
StoragePlugin plugin = create(config.getKey(), config.getValue());
@@ -165,14 +164,15 @@ public class StoragePluginRegistry implements Iterable<Map.Entry<String, Storage
}
}
activePlugins.put("INFORMATION_SCHEMA", new InfoSchemaStoragePlugin(new InfoSchemaConfig(), context, "INFORMATION_SCHEMA"));
-
+ activePlugins.put("sys", new SystemTablePlugin(SystemTablePluginConfig.INSTANCE, context, "sys"));
+
return activePlugins;
}
public StoragePlugin getPlugin(String registeredStoragePluginName) throws ExecutionSetupException {
return plugins.get(registeredStoragePluginName);
}
-
+
public StoragePlugin getPlugin(StoragePluginConfig config) throws ExecutionSetupException {
if(config instanceof NamedStoragePluginConfig){
return plugins.get(((NamedStoragePluginConfig) config).name);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java.orig
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java.orig b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java.orig
new file mode 100644
index 0000000..2c75651
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java.orig
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.net.URL;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import com.google.common.base.Preconditions;
+import net.hydromatic.linq4j.expressions.DefaultExpression;
+import net.hydromatic.linq4j.expressions.Expression;
+import net.hydromatic.optiq.SchemaPlus;
+import net.hydromatic.optiq.tools.RuleSet;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.util.PathScanner;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.planner.logical.DrillRuleSets;
+import org.apache.drill.exec.cache.DistributedMap;
+import org.apache.drill.exec.cache.JacksonDrillSerializable.StoragePluginsSerializable;
+import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.planner.logical.StoragePlugins;
+import org.apache.drill.exec.rpc.user.DrillUser;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.dfs.FileSystemPlugin;
+import org.apache.drill.exec.store.dfs.FormatPlugin;
+import org.apache.drill.exec.store.ischema.InfoSchemaConfig;
+import org.apache.drill.exec.store.ischema.InfoSchemaStoragePlugin;
+import org.eigenbase.relopt.RelOptRule;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableSet.Builder;
+import com.google.common.io.Resources;
+import org.apache.drill.exec.store.options.OptionValueStorageConfig;
+import org.apache.drill.exec.store.options.OptionValueStoragePlugin;
+
+
+public class StoragePluginRegistry implements Iterable<Map.Entry<String, StoragePlugin>>{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StoragePluginRegistry.class);
+
+ private Map<Object, Constructor<? extends StoragePlugin>> availablePlugins = new HashMap<Object, Constructor<? extends StoragePlugin>>();
+ private ImmutableMap<String, StoragePlugin> plugins;
+
+ private DrillbitContext context;
+ private final DrillSchemaFactory schemaFactory = new DrillSchemaFactory();
+
+ private RuleSet storagePluginsRuleSet;
+
+ private static final Expression EXPRESSION = new DefaultExpression(Object.class);
+
+ public StoragePluginRegistry(DrillbitContext context) {
+ try{
+ this.context = context;
+ }catch(RuntimeException e){
+ logger.error("Failure while loading storage plugin registry.", e);
+ throw new RuntimeException("Faiure while reading and loading storage plugin configuration.", e);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public void init() throws DrillbitStartupException {
+ DrillConfig config = context.getConfig();
+ Collection<Class<? extends StoragePlugin>> plugins = PathScanner.scanForImplementations(StoragePlugin.class, config.getStringList(ExecConstants.STORAGE_ENGINE_SCAN_PACKAGES));
+ logger.debug("Loading storage plugins {}", plugins);
+ for(Class<? extends StoragePlugin> plugin: plugins){
+ int i =0;
+ for(Constructor<?> c : plugin.getConstructors()){
+ Class<?>[] params = c.getParameterTypes();
+ if(params.length != 3
+ || params[1] != DrillbitContext.class
+ || !StoragePluginConfig.class.isAssignableFrom(params[0])
+ || params[2] != String.class){
+ logger.info("Skipping StoragePlugin constructor {} for plugin class {} since it doesn't implement a [constructor(StoragePluginConfig, DrillbitContext, String)]", c, plugin);
+ continue;
+ }
+ availablePlugins.put(params[0], (Constructor<? extends StoragePlugin>) c);
+ i++;
+ }
+ if(i == 0){
+ logger.debug("Skipping registration of StoragePlugin {} as it doesn't have a constructor with the parameters of (StorangePluginConfig, Config)", plugin.getCanonicalName());
+ }
+ }
+
+ // create registered plugins defined in "storage-plugins.json"
+ this.plugins = ImmutableMap.copyOf(createPlugins());
+
+ // query registered engines for optimizer rules and build the storage plugin RuleSet
+ Builder<RelOptRule> setBuilder = ImmutableSet.builder();
+ for (StoragePlugin plugin : this.plugins.values()) {
+ Set<StoragePluginOptimizerRule> rules = plugin.getOptimizerRules();
+ if (rules != null && rules.size() > 0) {
+ setBuilder.addAll(rules);
+ }
+ }
+ this.storagePluginsRuleSet = DrillRuleSets.create(setBuilder.build());
+ }
+
+ private Map<String, StoragePlugin> createPlugins() throws DrillbitStartupException {
+ /*
+ * Check if "storage-plugins.json" exists. Also check if "storage-plugins" object exists in Distributed Cache.
+ * If both exist, check that they are the same. If they differ, throw exception. If "storage-plugins.json" exists, but
+ * nothing found in cache, then add it to the cache. If neither are found, throw exception.
+ */
+ StoragePlugins plugins = null;
+ StoragePlugins cachedPlugins = null;
+ Map<String, StoragePlugin> activePlugins = new HashMap<String, StoragePlugin>();
+ try{
+ URL url = Resources.class.getClassLoader().getResource("storage-plugins.json");
+ if (url != null) {
+ String pluginsData = Resources.toString(url, Charsets.UTF_8);
+ plugins = context.getConfig().getMapper().readValue(pluginsData, StoragePlugins.class);
+ }
+ DistributedMap<StoragePluginsSerializable> map = context.getCache().getMap(StoragePluginsSerializable.class);
+ StoragePluginsSerializable cachedPluginsSerializable = map.get("storage-plugins");
+ if (cachedPluginsSerializable != null) {
+ cachedPlugins = cachedPluginsSerializable.getObj();
+ logger.debug("Found cached storage plugin config: {}", cachedPlugins);
+ } else {
+ Preconditions.checkNotNull(plugins,"No storage plugin configuration found");
+ logger.debug("caching storage plugin config {}", plugins);
+ map.put("storage-plugins", new StoragePluginsSerializable(context, plugins));
+ cachedPluginsSerializable = map.get("storage-plugins");
+ cachedPlugins = cachedPluginsSerializable.getObj();
+ }
+ if(!(plugins == null || cachedPlugins.equals(plugins))) {
+ logger.error("Storage plugin config mismatch. {}. {}", plugins, cachedPlugins);
+ throw new DrillbitStartupException("Storage plugin config mismatch");
+ }
+ logger.debug("using plugin config: {}", cachedPlugins);
+ }catch(IOException e){
+ throw new IllegalStateException("Failure while reading storage plugins data.", e);
+ }
+
+ for(Map.Entry<String, StoragePluginConfig> config : cachedPlugins){
+ try{
+ StoragePlugin plugin = create(config.getKey(), config.getValue());
+ activePlugins.put(config.getKey(), plugin);
+ }catch(ExecutionSetupException e){
+ logger.error("Failure while setting up StoragePlugin with name: '{}'.", config.getKey(), e);
+ }
+ }
+<<<<<<< HEAD
+ activePlugins.put("INFORMATION_SCHEMA", new InfoSchemaStoragePlugin(new InfoSchemaConfig(), context, "INFORMATION_SCHEMA"));
+
+ return activePlugins;
+=======
+ activeEngines.put("INFORMATION_SCHEMA", new InfoSchemaStoragePlugin(new InfoSchemaConfig(), context, "INFORMATION_SCHEMA"));
+ activeEngines.put("OPTIONS", new OptionValueStoragePlugin(new OptionValueStorageConfig(), context, "OPTIONS"));
+
+ return activeEngines;
+>>>>>>> Drill 381 - implementation of session and global options.
+ }
+
+ public StoragePlugin getPlugin(String registeredStoragePluginName) throws ExecutionSetupException {
+ return plugins.get(registeredStoragePluginName);
+ }
+
+ public StoragePlugin getPlugin(StoragePluginConfig config) throws ExecutionSetupException {
+ if(config instanceof NamedStoragePluginConfig){
+ return plugins.get(((NamedStoragePluginConfig) config).name);
+ }else{
+ // TODO: for now, we'll throw away transient configs. we really ought to clean these up.
+ return create(null, config);
+ }
+ }
+
+ public FormatPlugin getFormatPlugin(StoragePluginConfig storageConfig, FormatPluginConfig formatConfig) throws ExecutionSetupException{
+ StoragePlugin p = getPlugin(storageConfig);
+ if(!(p instanceof FileSystemPlugin)) throw new ExecutionSetupException(String.format("You tried to request a format plugin for a storage plugin that wasn't of type FileSystemPlugin. The actual type of plugin was %s.", p.getClass().getName()));
+ FileSystemPlugin storage = (FileSystemPlugin) p;
+ return storage.getFormatPlugin(formatConfig);
+ }
+
+ private StoragePlugin create(String name, StoragePluginConfig pluginConfig) throws ExecutionSetupException {
+ StoragePlugin plugin = null;
+ Constructor<? extends StoragePlugin> c = availablePlugins.get(pluginConfig.getClass());
+ if (c == null)
+ throw new ExecutionSetupException(String.format("Failure finding StoragePlugin constructor for config %s",
+ pluginConfig));
+ try {
+ plugin = c.newInstance(pluginConfig, context, name);
+ return plugin;
+ } catch (InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
+ Throwable t = e instanceof InvocationTargetException ? ((InvocationTargetException) e).getTargetException() : e;
+ if (t instanceof ExecutionSetupException)
+ throw ((ExecutionSetupException) t);
+ throw new ExecutionSetupException(String.format(
+ "Failure setting up new storage plugin configuration for config %s", pluginConfig), t);
+ }
+ }
+
+ @Override
+ public Iterator<Entry<String, StoragePlugin>> iterator() {
+ return plugins.entrySet().iterator();
+ }
+
+ public RuleSet getStoragePluginRuleSet() {
+ return storagePluginsRuleSet;
+ }
+
+ public DrillSchemaFactory getSchemaFactory(){
+ return schemaFactory;
+ }
+
+ public class DrillSchemaFactory implements SchemaFactory{
+
+ @Override
+ public void registerSchemas(DrillUser user, SchemaPlus parent) {
+ for(Map.Entry<String, StoragePlugin> e : plugins.entrySet()){
+ e.getValue().registerSchemas(user, parent);
+ }
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java
index 256b6b6..eed4f03 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java
@@ -26,6 +26,7 @@ import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.physical.EndpointAffinity;
import org.apache.drill.exec.physical.OperatorCost;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.Size;
import org.apache.drill.exec.physical.base.SubScan;
@@ -83,4 +84,10 @@ public class DirectGroupScan extends AbstractGroupScan{
public String getDigest() {
return String.valueOf(reader);
}
+
+ @Override
+ public GroupScan clone(List<SchemaPath> columns) {
+ return this;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java
index 8ae5116..84b6690 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java
@@ -22,6 +22,7 @@ import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
+import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.common.types.TypeProtos.MinorType;
@@ -29,6 +30,7 @@ import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.physical.EndpointAffinity;
import org.apache.drill.exec.physical.OperatorCost;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.Size;
import org.apache.drill.exec.physical.base.SubScan;
@@ -74,13 +76,13 @@ public class MockGroupScanPOP extends AbstractGroupScan {
public List<MockScanEntry> getReadEntries() {
return readEntries;
}
-
+
public static class MockScanEntry{
private final int records;
private final MockColumn[] types;
private final int recordSize;
-
+
@JsonCreator
public MockScanEntry(@JsonProperty("records") int records, @JsonProperty("types") MockColumn[] types) {
@@ -97,7 +99,7 @@ public class MockGroupScanPOP extends AbstractGroupScan {
public OperatorCost getCost() {
return new OperatorCost(1, 2, 1, 1);
}
-
+
public int getRecords() {
return records;
}
@@ -116,7 +118,7 @@ public class MockGroupScanPOP extends AbstractGroupScan {
return "MockScanEntry [records=" + records + ", columns=" + Arrays.toString(types) + "]";
}
}
-
+
@JsonInclude(Include.NON_NULL)
public static class MockColumn{
@JsonProperty("type") public MinorType minorType;
@@ -125,8 +127,8 @@ public class MockGroupScanPOP extends AbstractGroupScan {
public Integer width;
public Integer precision;
public Integer scale;
-
-
+
+
@JsonCreator
public MockColumn(@JsonProperty("name") String name, @JsonProperty("type") MinorType minorType, @JsonProperty("mode") DataMode mode, @JsonProperty("width") Integer width, @JsonProperty("precision") Integer precision, @JsonProperty("scale") Integer scale) {
this.name = name;
@@ -136,7 +138,7 @@ public class MockGroupScanPOP extends AbstractGroupScan {
this.precision = precision;
this.scale = scale;
}
-
+
@JsonProperty("type")
public MinorType getMinorType() {
return minorType;
@@ -156,7 +158,7 @@ public class MockGroupScanPOP extends AbstractGroupScan {
public Integer getScale() {
return scale;
}
-
+
@JsonIgnore
public MajorType getMajorType(){
MajorType.Builder b = MajorType.newBuilder();
@@ -172,7 +174,7 @@ public class MockGroupScanPOP extends AbstractGroupScan {
public String toString() {
return "MockColumn [minorType=" + minorType + ", name=" + name + ", mode=" + mode + "]";
}
-
+
}
@Override
@@ -184,7 +186,7 @@ public class MockGroupScanPOP extends AbstractGroupScan {
@Override
public void applyAssignments(List<DrillbitEndpoint> endpoints) {
Preconditions.checkArgument(endpoints.size() <= getReadEntries().size());
-
+
mappings = new LinkedList[endpoints.size()];
int i =0;
@@ -230,6 +232,11 @@ public class MockGroupScanPOP extends AbstractGroupScan {
}
@Override
+ public GroupScan clone(List<SchemaPath> columns) {
+ return this;
+ }
+
+ @Override
public String getDigest() {
return toString();
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/AbstractWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/AbstractWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/AbstractWriter.java
index f0d4fb6..0c9a1cd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/AbstractWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/AbstractWriter.java
@@ -48,7 +48,7 @@ abstract class AbstractWriter<V extends ValueVector> implements PojoWriter{
@Override
public void allocate() {
- AllocationHelper.allocate(vector, 500, 100);
+ vector.allocateNew();
}
public void setValueCount(int valueCount){
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoDataType.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoDataType.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoDataType.java
new file mode 100644
index 0000000..8ecb29f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoDataType.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.pojo;
+
+import java.lang.reflect.Field;
+import java.util.List;
+
+import org.eigenbase.reltype.RelDataType;
+import org.eigenbase.reltype.RelDataTypeFactory;
+import org.eigenbase.sql.type.SqlTypeName;
+
+import com.google.common.collect.Lists;
+
+public class PojoDataType {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PojoDataType.class);
+
+ public List<SqlTypeName> types = Lists.newArrayList();
+ public List<String> names = Lists.newArrayList();
+
+ public PojoDataType(Class<?> pojoClass){
+ logger.debug(pojoClass.getName());
+ Field[] fields = pojoClass.getDeclaredFields();
+ for(int i = 0; i < fields.length; i++){
+ Field f = fields[i];
+
+ Class<?> type = f.getType();
+ names.add(f.getName());
+
+ if(type == int.class || type == Integer.class){
+ types.add(SqlTypeName.INTEGER);
+ }else if(type == boolean.class || type == Boolean.class){
+ types.add(SqlTypeName.BOOLEAN);
+ }else if(type == long.class || type == Long.class){
+ types.add(SqlTypeName.BIGINT);
+ }else if(type == double.class || type == Double.class){
+ types.add(SqlTypeName.DOUBLE);
+ }else if(type == String.class){
+ types.add(SqlTypeName.VARCHAR);
+ }else if(type.isEnum()){
+ types.add(SqlTypeName.VARCHAR);
+ }else{
+ throw new RuntimeException(String.format("PojoRecord reader doesn't yet support conversions from type [%s].", type));
+ }
+ }
+ }
+
+
+ public RelDataType getRowType(RelDataTypeFactory f){
+ List<RelDataType> fields = Lists.newArrayList();
+ for(SqlTypeName n : types){
+ fields.add(f.createSqlType(n));
+ }
+ return f.createStructType(fields, names);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
index 8dac455..4203abc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
@@ -25,8 +25,14 @@ import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.store.RecordReader;
import org.apache.drill.exec.store.pojo.Writers.BitWriter;
+import org.apache.drill.exec.store.pojo.Writers.DoubleWriter;
+import org.apache.drill.exec.store.pojo.Writers.EnumWriter;
import org.apache.drill.exec.store.pojo.Writers.IntWriter;
import org.apache.drill.exec.store.pojo.Writers.LongWriter;
+import org.apache.drill.exec.store.pojo.Writers.NBigIntWriter;
+import org.apache.drill.exec.store.pojo.Writers.NBooleanWriter;
+import org.apache.drill.exec.store.pojo.Writers.NDoubleWriter;
+import org.apache.drill.exec.store.pojo.Writers.NIntWriter;
import org.apache.drill.exec.store.pojo.Writers.StringWriter;
public class PojoRecordReader<T> implements RecordReader{
@@ -48,7 +54,7 @@ public class PojoRecordReader<T> implements RecordReader{
@Override
public void setup(OutputMutator output) throws ExecutionSetupException {
try{
- Field[] fields = pojoClass.getFields();
+ Field[] fields = pojoClass.getDeclaredFields();
writers = new PojoWriter[fields.length];
for(int i = 0; i < writers.length; i++){
Field f = fields[i];
@@ -56,6 +62,18 @@ public class PojoRecordReader<T> implements RecordReader{
if(type == int.class){
writers[i] = new IntWriter(f);
+ }else if(type == Integer.class){
+ writers[i] = new NIntWriter(f);
+ }else if(type == Long.class){
+ writers[i] = new NBigIntWriter(f);
+ }else if(type == Boolean.class){
+ writers[i] = new NBooleanWriter(f);
+ }else if(type == double.class){
+ writers[i] = new DoubleWriter(f);
+ }else if(type == Double.class){
+ writers[i] = new NDoubleWriter(f);
+ }else if(type.isEnum()){
+ writers[i] = new EnumWriter(f);
}else if(type == boolean.class){
writers[i] = new BitWriter(f);
}else if(type == long.class){
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/Writers.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/Writers.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/Writers.java
index 6910903..b986be8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/Writers.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/Writers.java
@@ -27,7 +27,12 @@ import org.apache.drill.common.types.Types;
import org.apache.drill.exec.expr.holders.NullableVarCharHolder;
import org.apache.drill.exec.vector.BigIntVector;
import org.apache.drill.exec.vector.BitVector;
+import org.apache.drill.exec.vector.Float8Vector;
import org.apache.drill.exec.vector.IntVector;
+import org.apache.drill.exec.vector.NullableBigIntVector;
+import org.apache.drill.exec.vector.NullableBitVector;
+import org.apache.drill.exec.vector.NullableFloat8Vector;
+import org.apache.drill.exec.vector.NullableIntVector;
import org.apache.drill.exec.vector.NullableVarCharVector;
import com.google.common.base.Charsets;
@@ -81,18 +86,32 @@ public class Writers {
}
- public static class StringWriter extends AbstractWriter<NullableVarCharVector>{
+ public static class DoubleWriter extends AbstractWriter<Float8Vector>{
+ public DoubleWriter(Field field) {
+ super(field, Types.required(MinorType.FLOAT8));
+ if(field.getType() != double.class) throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException {
+ double d = field.getDouble(pojo);
+
+ return vector.getMutator().setSafe(outboundIndex, d);
+ }
+
+ }
+
+ private abstract static class AbstractStringWriter extends AbstractWriter<NullableVarCharVector>{
private ByteBuf data;
private final NullableVarCharHolder h = new NullableVarCharHolder();
- public StringWriter(Field field) {
+ public AbstractStringWriter(Field field) {
super(field, Types.optional(MinorType.VARCHAR));
- if(field.getType() != String.class) throw new IllegalStateException();
ensureLength(100);
}
- private void ensureLength(int len){
+ void ensureLength(int len){
if(data == null || data.capacity() < len){
if(data != null) data.release();
data = UnpooledByteBufAllocator.DEFAULT.buffer(len);
@@ -103,11 +122,9 @@ public class Writers {
data.release();
}
- @Override
- public boolean writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException {
- String s = (String) field.get(pojo);
+ public boolean writeString(String s, int outboundIndex) throws IllegalArgumentException, IllegalAccessException {
if(s == null){
- h.isSet = 0;
+ return true;
}else{
h.isSet = 1;
byte[] bytes = s.getBytes(Charsets.UTF_8);
@@ -117,10 +134,108 @@ public class Writers {
h.buffer = data;
h.start = 0;
h.end = bytes.length;
+ return vector.getMutator().setSafe(outboundIndex, h);
+
+ }
+
+ }
+
+ }
+
+ public static class EnumWriter extends AbstractStringWriter{
+ public EnumWriter(Field field) {
+ super(field);
+ if(!field.getType().isEnum()) throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException {
+ Enum<?> e= ((Enum<?>) field.get(pojo));
+ if(e == null) return true;
+ return writeString(e.name(), outboundIndex);
+ }
+ }
+
+ public static class StringWriter extends AbstractStringWriter {
+ public StringWriter(Field field) {
+ super(field);
+ if(field.getType() != String.class) throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException {
+ String s = (String) field.get(pojo);
+ return writeString(s, outboundIndex);
+ }
+ }
+
+ public static class NIntWriter extends AbstractWriter<NullableIntVector>{
+
+ public NIntWriter(Field field) {
+ super(field, Types.optional(MinorType.INT));
+ if(field.getType() != Integer.class) throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException {
+ Integer i = (Integer) field.get(pojo);
+ if(i != null){
+ return vector.getMutator().setSafe(outboundIndex, i);
+ }
+ return true;
+ }
+
+ }
+ public static class NBigIntWriter extends AbstractWriter<NullableBigIntVector>{
+
+ public NBigIntWriter(Field field) {
+ super(field, Types.optional(MinorType.BIGINT));
+ if(field.getType() != Long.class) throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException {
+ Long o = (Long) field.get(pojo);
+ if(o != null){
+ return vector.getMutator().setSafe(outboundIndex, o);
}
+ return true;
+ }
- return vector.getMutator().setSafe(outboundIndex, h);
+ }
+
+ public static class NBooleanWriter extends AbstractWriter<NullableBitVector>{
+
+ public NBooleanWriter(Field field) {
+ super(field, Types.optional(MinorType.BIT));
+ if(field.getType() != Boolean.class) throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException {
+ Boolean o = (Boolean) field.get(pojo);
+ if(o != null){
+ return vector.getMutator().setSafe(outboundIndex, o ? 1 : 0);
+ }
+ return true;
+ }
+
+ }
+ public static class NDoubleWriter extends AbstractWriter<NullableFloat8Vector>{
+
+ public NDoubleWriter(Field field) {
+ super(field, Types.optional(MinorType.FLOAT8));
+ if(field.getType() != Double.class) throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException {
+ Double o = (Double) field.get(pojo);
+ if(o != null){
+ return vector.getMutator().setSafe(outboundIndex, o);
+ }
+ return true;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/DrillbitIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/DrillbitIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/DrillbitIterator.java
new file mode 100644
index 0000000..844fd68
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/DrillbitIterator.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys;
+
+import java.util.Iterator;
+
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+public class DrillbitIterator implements Iterator<Object> {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillbitIterator.class);
+
+ private Iterator<DrillbitEndpoint> endpoints;
+
+ public DrillbitIterator(FragmentContext c) {
+ this.endpoints = c.getDrillbitContext().getBits().iterator();
+ }
+
+ public static class DrillbitInstance {
+ public String host;
+ public int user_port;
+ public int control_port;
+ public int data_port;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return endpoints.hasNext();
+ }
+
+ @Override
+ public Object next() {
+ DrillbitEndpoint ep = endpoints.next();
+ DrillbitInstance i = new DrillbitInstance();
+ i.host = ep.getAddress();
+ i.user_port = ep.getUserPort();
+ i.control_port = ep.getControlPort();
+ i.data_port = ep.getDataPort();
+ return i;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/StaticDrillTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/StaticDrillTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/StaticDrillTable.java
new file mode 100644
index 0000000..c1e8dd1
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/StaticDrillTable.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys;
+
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.store.StoragePlugin;
+import org.apache.drill.exec.store.pojo.PojoDataType;
+import org.eigenbase.reltype.RelDataType;
+import org.eigenbase.reltype.RelDataTypeFactory;
+
+public class StaticDrillTable extends DrillTable{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StaticDrillTable.class);
+
+ private final PojoDataType type;
+
+ public StaticDrillTable(PojoDataType type, String storageEngineName, StoragePlugin plugin, Object selection) {
+ super(storageEngineName, plugin, selection);
+ this.type = type;
+ }
+
+ @Override
+ public RelDataType getRowType(RelDataTypeFactory typeFactory) {
+ return type.getRowType(typeFactory);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java
new file mode 100644
index 0000000..4301f12
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys;
+
+import org.apache.drill.exec.server.options.OptionValue;
+import org.apache.drill.exec.store.pojo.PojoDataType;
+import org.eigenbase.reltype.RelDataType;
+import org.eigenbase.reltype.RelDataTypeFactory;
+
+public enum SystemTable {
+ OPTION("options", OptionValue.class),
+ DRILLBITS("drillbits", DrillbitIterator.DrillbitInstance.class)
+ ;
+
+ private final PojoDataType type;
+ private final String tableName;
+ private final Class<?> pojoClass;
+
+ SystemTable(String tableName, Class<?> clazz){
+ this.type = new PojoDataType(clazz);
+ this.tableName = tableName;
+ this.pojoClass = clazz;
+ }
+
+ public String getTableName(){
+ return tableName;
+ }
+
+ public RelDataType getRowType(RelDataTypeFactory f){
+ return type.getRowType(f);
+ }
+
+ public PojoDataType getType(){
+ return type;
+ }
+
+ public Class<?> getPojoClass(){
+ return pojoClass;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java
new file mode 100644
index 0000000..a1bec1e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.pojo.PojoRecordReader;
+
+public class SystemTableBatchCreator implements BatchCreator<SystemTableScan>{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SystemTableBatchCreator.class);
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ public RecordBatch getBatch(FragmentContext context, SystemTableScan scan, List<RecordBatch> children)
+ throws ExecutionSetupException {
+ Iterator<Object> iter = scan.getPlugin().getRecordIterator(context, scan.getTable());
+ PojoRecordReader reader = new PojoRecordReader(scan.getTable().getPojoClass(), iter);
+
+ return new ScanBatch(scan, context, Collections.singleton( (RecordReader) reader).iterator());
+ }
+}
[4/6] DRILL-381: Implement SYSTEM and SESSION options.
Posted by ja...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
new file mode 100644
index 0000000..7fb8b6c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import net.hydromatic.optiq.SchemaPlus;
+
+import org.apache.drill.common.JSONOptions;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.rpc.user.DrillUser;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.DrillConfigIterator;
+import org.apache.drill.exec.server.options.OptionValue;
+import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.AbstractStoragePlugin;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import com.google.common.collect.Iterables;
+
+public class SystemTablePlugin extends AbstractStoragePlugin{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SystemTablePlugin.class);
+
+ private final DrillbitContext context;
+ private final String name;
+
+ public SystemTablePlugin(SystemTablePluginConfig configuration, DrillbitContext context, String name){
+ this.context = context;
+ this.name = name;
+ }
+
+ private SystemSchema schema = new SystemSchema();
+
+ @Override
+ public StoragePluginConfig getConfig() {
+ return SystemTablePluginConfig.INSTANCE;
+ }
+
+ @Override
+ public void registerSchemas(DrillUser user, SchemaPlus parent) {
+ parent.add(schema.getName(), schema);
+ }
+
+ public Iterator<Object> getRecordIterator(FragmentContext context, SystemTable table){
+ switch(table){
+ case DRILLBITS:
+ return new DrillbitIterator(context);
+ case OPTION:
+
+ return Iterables.concat((Iterable<Object>)(Object) new DrillConfigIterator(context.getConfig()), //
+ context.getOptions()).iterator();
+ default:
+ throw new UnsupportedOperationException("Unable to create record iterator for table: " + table.getTableName());
+ }
+ }
+
+
+ @Override
+ public AbstractGroupScan getPhysicalScan(JSONOptions selection, List<SchemaPath> columns) throws IOException {
+ SystemTable table = selection.getWith(context.getConfig(), SystemTable.class);
+ return new SystemTableScan(table, this);
+ }
+
+ private class SystemSchema extends AbstractSchema{
+
+ private Set<String> tableNames;
+
+ public SystemSchema() {
+ super("sys");
+ Set<String> names = Sets.newHashSet();
+ for(SystemTable t : SystemTable.values()){
+ names.add(t.getTableName());
+ }
+ this.tableNames = ImmutableSet.copyOf(names);
+ }
+
+ @Override
+ public Set<String> getTableNames() {
+ return tableNames;
+ }
+
+
+ @Override
+ public DrillTable getTable(String name) {
+ for(SystemTable table : SystemTable.values()){
+ if(table.getTableName().equalsIgnoreCase(name)){
+ return new StaticDrillTable(table.getType(), SystemTablePlugin.this.name, SystemTablePlugin.this, table);
+ }
+ }
+ return null;
+
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePluginConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePluginConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePluginConfig.java
new file mode 100644
index 0000000..bca9881
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePluginConfig.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys;
+
+import org.apache.drill.common.logical.StoragePluginConfig;
+
+public class SystemTablePluginConfig implements StoragePluginConfig{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SystemTablePluginConfig.class);
+
+ String name = "system-tables";
+
+ public static SystemTablePluginConfig INSTANCE = new SystemTablePluginConfig();
+
+ private SystemTablePluginConfig(){
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
new file mode 100644
index 0000000..9a745ac
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.PhysicalOperatorSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Size;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+
+import parquet.org.codehaus.jackson.annotate.JsonCreator;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("sys")
+public class SystemTableScan extends AbstractGroupScan implements SubScan{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SystemTableScan.class);
+
+ private final SystemTable table;
+ private final SystemTablePlugin plugin;
+
+ @JsonCreator
+ public SystemTableScan( //
+ @JsonProperty("table") SystemTable table, //
+ @JacksonInject StoragePluginRegistry engineRegistry //
+ ) throws IOException, ExecutionSetupException {
+ this.table = table;
+ this.plugin = (SystemTablePlugin) engineRegistry.getPlugin(SystemTablePluginConfig.INSTANCE);
+ }
+
+ public SystemTableScan(SystemTable table, SystemTablePlugin plugin){
+ this.table = table;
+ this.plugin = plugin;
+ }
+
+ @Override
+ public OperatorCost getCost() {
+ return new OperatorCost(1,1,1,1);
+ }
+
+ @Override
+ public Size getSize() {
+ return new Size(100,1);
+ }
+
+ @Override
+ public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
+ return this;
+ }
+
+ @Override
+ public void applyAssignments(List<DrillbitEndpoint> endpoints) throws PhysicalOperatorSetupException {
+ }
+
+ @Override
+ public SubScan getSpecificScan(int minorFragmentId) throws ExecutionSetupException {
+ return this;
+ }
+
+ @Override
+ public int getMaxParallelizationWidth() {
+ return 1;
+ }
+
+ @Override
+ public long getInitialAllocation() {
+ return initialAllocation;
+ }
+
+ @Override
+ public long getMaxAllocation() {
+ return maxAllocation;
+ }
+
+ @Override
+ public String getDigest() {
+ return "SystemTableScan: " + table.name();
+ }
+
+ @Override
+ public List<EndpointAffinity> getOperatorAffinity() {
+ return Collections.emptyList();
+ }
+
+
+ @Override
+ public GroupScan clone(List<SchemaPath> columns) {
+ return this;
+ }
+
+ public SystemTable getTable() {
+ return table;
+ }
+
+ public SystemTablePlugin getPlugin() {
+ return plugin;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
index 835adad..e7accba 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
@@ -49,21 +49,21 @@ import org.apache.drill.exec.work.fragment.NonRootStatusReporter;
public class ControlHandlerImpl implements ControlMessageHandler {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlHandlerImpl.class);
-
+
private final WorkerBee bee;
-
+
public ControlHandlerImpl(WorkerBee bee) {
super();
this.bee = bee;
}
-
+
@Override
public Response handle(ControlConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException {
if(RpcConstants.EXTRA_DEBUGGING) logger.debug("Received bit com message of type {}", rpcType);
switch (rpcType) {
-
+
case RpcType.REQ_CANCEL_FRAGMENT_VALUE:
FragmentHandle handle = get(pBody, FragmentHandle.PARSER);
cancelFragment(handle);
@@ -80,28 +80,28 @@ public class ControlHandlerImpl implements ControlMessageHandler {
startNewRemoteFragment(fragment);
return DataRpcConfig.OK;
- } catch (OutOfMemoryException e) {
- logger.error("Failure while attempting to start remote fragment.", fragment, e);
+ } catch (ExecutionSetupException e) {
+ logger.error("Failure while attempting to start remote fragment.", fragment);
return new Response(RpcType.ACK, Acks.FAIL);
}
-
+
default:
throw new RpcException("Not yet supported.");
}
}
-
-
-
+
+
+
/* (non-Javadoc)
* @see org.apache.drill.exec.work.batch.BitComHandler#startNewRemoteFragment(org.apache.drill.exec.proto.ExecProtos.PlanFragment)
*/
@Override
- public void startNewRemoteFragment(PlanFragment fragment) throws OutOfMemoryException{
+ public void startNewRemoteFragment(PlanFragment fragment) throws ExecutionSetupException{
logger.debug("Received remote fragment start instruction", fragment);
FragmentContext context = new FragmentContext(bee.getContext(), fragment, null, new FunctionImplementationRegistry(bee.getContext().getConfig()));
ControlTunnel tunnel = bee.getContext().getController().getTunnel(fragment.getForeman());
-
+
NonRootStatusReporter listener = new NonRootStatusReporter(context, tunnel);
try{
FragmentRoot rootOperator = bee.getContext().getPlanReader().readFragmentOperator(fragment.getFragmentJson());
@@ -117,15 +117,15 @@ public class ControlHandlerImpl implements ControlMessageHandler {
listener.fail(fragment.getHandle(), "Failure due to uncaught exception", e);
} catch (OutOfMemoryError t) {
if(t.getMessage().startsWith("Direct buffer")){
- listener.fail(fragment.getHandle(), "Failure due to error", t);
+ listener.fail(fragment.getHandle(), "Failure due to error", t);
}else{
throw t;
}
-
+
}
-
+
}
-
+
/* (non-Javadoc)
* @see org.apache.drill.exec.work.batch.BitComHandler#cancelFragment(org.apache.drill.exec.proto.ExecProtos.FragmentHandle)
*/
@@ -141,10 +141,10 @@ public class ControlHandlerImpl implements ControlMessageHandler {
FragmentExecutor runner = bee.getFragmentRunner(handle);
if(runner != null) runner.cancel();
}
-
+
return Acks.OK;
}
-
-
-
+
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
index 95f2dc6..d00478b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.work.batch;
import io.netty.buffer.ByteBuf;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.proto.BitControl.PlanFragment;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
@@ -26,14 +27,13 @@ import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.rpc.Response;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.control.ControlConnection;
-import org.apache.drill.exec.work.fragment.FragmentManager;
public interface ControlMessageHandler {
public abstract Response handle(ControlConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody)
throws RpcException;
- public abstract void startNewRemoteFragment(PlanFragment fragment) throws OutOfMemoryException;
+ public abstract void startNewRemoteFragment(PlanFragment fragment) throws OutOfMemoryException, ExecutionSetupException;
public abstract Ack cancelFragment(FragmentHandle handle);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 6027c44..e73ddde 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -21,8 +21,11 @@ import io.netty.buffer.ByteBuf;
import java.io.Closeable;
import java.io.IOException;
+import java.util.Iterator;
import java.util.List;
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.drill.common.JSONOptions;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.ExpressionPosition;
@@ -31,7 +34,9 @@ import org.apache.drill.common.logical.LogicalPlan;
import org.apache.drill.common.logical.PlanProperties.Generator.ResultMode;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.common.types.Types;
+import org.apache.drill.common.logical.data.LogicalOperator;
import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.cache.DistributedMultiMap;
import org.apache.drill.exec.exception.FragmentSetupException;
import org.apache.drill.exec.exception.OptimizerException;
import org.apache.drill.exec.ops.QueryContext;
@@ -49,6 +54,7 @@ import org.apache.drill.exec.planner.sql.DirectPlan;
import org.apache.drill.exec.planner.sql.DrillSqlWorker;
import org.apache.drill.exec.proto.BitControl.PlanFragment;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
@@ -186,6 +192,7 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
}
}
+
private void parseAndRunLogicalPlan(String json) {
try {
@@ -260,6 +267,7 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
}
}
+
private void runPhysicalPlan(PhysicalPlan plan) {
if(plan.getProperties().resultMode != ResultMode.EXEC){
@@ -280,7 +288,7 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
SimpleParallelizer parallelizer = new SimpleParallelizer();
try {
- QueryWorkUnit work = parallelizer.getFragments(context.getCurrentEndpoint(), queryId, context.getActiveEndpoints(),
+ QueryWorkUnit work = parallelizer.getFragments(context.getOptions().getSessionOptionList(), context.getCurrentEndpoint(), queryId, context.getActiveEndpoints(),
context.getPlanReader(), rootFragment, planningSet, context.getConfig().getInt(ExecConstants.GLOBAL_MAX_WIDTH),
context.getConfig().getInt(ExecConstants.MAX_WIDTH_PER_ENDPOINT));
@@ -329,7 +337,7 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
private PhysicalPlan convert(LogicalPlan plan) throws OptimizerException {
if(logger.isDebugEnabled()) logger.debug("Converting logical plan {}.", plan.toJsonStringSafe(context.getConfig()));
- return new BasicOptimizer(DrillConfig.create(), context).optimize(new BasicOptimizer.BasicOptimizationContext(), plan);
+ return new BasicOptimizer(DrillConfig.create(), context, initiatingClient).optimize(new BasicOptimizer.BasicOptimizationContext(context), plan);
}
public QueryResult getResult(UserClientConnection connection, RequestResults req) {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
index c8f2021..5cad658 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
@@ -60,8 +60,8 @@ public class NonRootFragmentManager implements FragmentManager {
this.context.setBuffers(buffers);
this.runnerListener = new NonRootStatusReporter(this.context, context.getController().getTunnel(fragment.getForeman()));
this.reader = context.getPlanReader();
-
- }catch(IOException e){
+
+ }catch(ExecutionSetupException | IOException e){
throw new FragmentSetupException("Failure while decoding fragment.", e);
}
}
@@ -92,7 +92,7 @@ public class NonRootFragmentManager implements FragmentManager {
return null;
}
}
-
+
}
/* (non-Javadoc)
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
index 2b8779a..4f5e2e0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
@@ -28,6 +28,7 @@ import org.apache.drill.exec.proto.UserProtos.RequestResults;
import org.apache.drill.exec.proto.UserProtos.RunQuery;
import org.apache.drill.exec.rpc.Acks;
import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
+import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.SchemaFactory;
import org.apache.drill.exec.work.WorkManager.WorkerBee;
import org.apache.drill.exec.work.foreman.Foreman;
@@ -74,4 +75,8 @@ public class UserWorker{
public SchemaFactory getSchemaFactory(){
return bee.getContext().getSchemaFactory();
}
+
+ public OptionManager getSystemOptions(){
+ return bee.getContext().getOptionManager();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
index 948c763..6770ee7 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
@@ -39,6 +39,9 @@ import org.apache.drill.exec.planner.sql.DrillSqlWorker;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.rpc.user.UserSession;
import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.server.options.SessionOptionManager;
+import org.apache.drill.exec.server.options.SystemOptionManager;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.junit.Rule;
import org.junit.rules.TestRule;
@@ -51,7 +54,7 @@ import com.google.common.io.Resources;
public class PlanningBase extends ExecTest{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PlanningBase.class);
- @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(30000);
+ @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(10000);
@Mocked DrillbitContext dbContext;
@Mocked QueryContext context;
@@ -68,6 +71,10 @@ public class PlanningBase extends ExecTest{
final DistributedCache cache = new LocalCache();
cache.run();
+ final SystemOptionManager opt = new SystemOptionManager(cache);
+ opt.init();
+ final OptionManager sess = new SessionOptionManager(opt);
+
new NonStrictExpectations() {
{
dbContext.getMetrics();
@@ -76,6 +83,8 @@ public class PlanningBase extends ExecTest{
result = new TopLevelAllocator();
dbContext.getConfig();
result = config;
+ dbContext.getOptionManager();
+ result = opt;
dbContext.getCache();
result = cache;
}
@@ -88,6 +97,7 @@ public class PlanningBase extends ExecTest{
registry.getSchemaFactory().registerSchemas(null, root);
+
new NonStrictExpectations() {
{
context.getNewDefaultSchema();
@@ -97,13 +107,15 @@ public class PlanningBase extends ExecTest{
context.getFunctionRegistry();
result = functionRegistry;
context.getSession();
- result = new UserSession(null, null);
+ result = new UserSession(null, null, null);
context.getCurrentEndpoint();
result = DrillbitEndpoint.getDefaultInstance();
context.getActiveEndpoints();
result = ImmutableList.of(DrillbitEndpoint.getDefaultInstance());
context.getPlannerSettings();
- result = new PlannerSettings();
+ result = new PlannerSettings(sess);
+ context.getOptions();
+ result = sess;
context.getConfig();
result = config;
context.getCache();
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/test/java/org/apache/drill/TestTpchSingleMode.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestTpchSingleMode.java b/exec/java-exec/src/test/java/org/apache/drill/TestTpchSingleMode.java
index 1ccb65c..a459bef 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestTpchSingleMode.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestTpchSingleMode.java
@@ -20,10 +20,11 @@ package org.apache.drill;
import org.junit.Ignore;
import org.junit.Test;
+@Ignore // DRILL-648
public class TestTpchSingleMode extends BaseTestQuery{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestTpchSingleMode.class);
- private static final String SINGLE_MODE = "ALTER SESSION SET NO_EXCHANGES = true;";
+ private static final String SINGLE_MODE = "ALTER SESSION SET `planner.disable_exchanges` = true;";
private void testSingleMode(String fileName) throws Exception{
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
index 4b2378d..41b00da 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
@@ -109,11 +109,11 @@ public class TestOptiqPlans extends ExecTest {
};
RemoteServiceSet lss = RemoteServiceSet.getLocalServiceSet();
DrillbitContext bitContext = new DrillbitContext(DrillbitEndpoint.getDefaultInstance(), context, coord, controller, com, cache, workBus);
- QueryContext qc = new QueryContext(new UserSession(null, null), QueryId.getDefaultInstance(), bitContext);
+ QueryContext qc = new QueryContext(new UserSession(null, null, null), QueryId.getDefaultInstance(), bitContext);
PhysicalPlanReader reader = bitContext.getPlanReader();
LogicalPlan plan = reader.readLogicalPlan(Files.toString(FileUtils.getResourceAsFile(file), Charsets.UTF_8));
- PhysicalPlan pp = new BasicOptimizer(DrillConfig.create(), qc).optimize(
- new BasicOptimizer.BasicOptimizationContext(), plan);
+ PhysicalPlan pp = new BasicOptimizer(DrillConfig.create(), qc, connection).optimize(new BasicOptimizer.BasicOptimizationContext(qc), plan);
+
FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
FragmentContext fctxt = new FragmentContext(bitContext, PlanFragment.getDefaultInstance(), connection, registry);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java
index 37e350e..1b38dce 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java
@@ -27,6 +27,7 @@ import org.apache.drill.exec.planner.fragment.StatsCollector;
import org.apache.drill.exec.proto.BitControl.PlanFragment;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.server.options.OptionList;
import org.apache.drill.exec.work.QueryWorkUnit;
import org.junit.Test;
@@ -34,17 +35,17 @@ import com.google.common.collect.Lists;
public class TestFragmentChecker extends PopUnitTestBase{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestFragmentChecker.class);
-
-
+
+
@Test
public void checkSimpleExchangePlan() throws Exception{
print("/physical_double_exchange.json", 2, 3);
}
-
-
+
+
private void print(String fragmentFile, int bitCount, int exepectedFragmentCount) throws Exception{
-
+
System.out.println(String.format("=================Building plan fragments for [%s]. Allowing %d total Drillbits.==================", fragmentFile, bitCount));
PhysicalPlanReader ppr = new PhysicalPlanReader(CONFIG, CONFIG.getMapper(), DrillbitEndpoint.getDefaultInstance());
Fragment fragmentRoot = getRootFragment(ppr, fragmentFile);
@@ -54,27 +55,27 @@ public class TestFragmentChecker extends PopUnitTestBase{
DrillbitEndpoint localBit = null;
for(int i =0; i < bitCount; i++){
DrillbitEndpoint b1 = DrillbitEndpoint.newBuilder().setAddress("localhost").setControlPort(1234+i).build();
- if(i ==0) localBit = b1;
+ if(i ==0) localBit = b1;
endpoints.add(b1);
}
-
-
- QueryWorkUnit qwu = par.getFragments(localBit, QueryId.getDefaultInstance(), endpoints, ppr, fragmentRoot, planningSet, 10, 5);
+
+
+ QueryWorkUnit qwu = par.getFragments(new OptionList(), localBit, QueryId.getDefaultInstance(), endpoints, ppr, fragmentRoot, planningSet, 10, 5);
System.out.println(String.format("=========ROOT FRAGMENT [%d:%d] =========", qwu.getRootFragment().getHandle().getMajorFragmentId(), qwu.getRootFragment().getHandle().getMinorFragmentId()));
-
+
System.out.print(qwu.getRootFragment().getFragmentJson());
-
-
+
+
for(PlanFragment f : qwu.getFragments()){
System.out.println(String.format("=========Fragment [%d:%d]=====", f.getHandle().getMajorFragmentId(), f.getHandle().getMinorFragmentId()));
System.out.print(f.getFragmentJson());
}
-
+
//assertEquals(exepectedFragmentCount, qwu.getFragments().size());
logger.debug("Planning Set {}", planningSet);
}
-
+
@Test
public void validateSingleExchangeFragment() throws Exception{
print("/physical_single_exchange.json", 1, 2);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestOptions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestOptions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestOptions.java
new file mode 100644
index 0000000..c522da8
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestOptions.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server;
+
+import org.apache.drill.BaseTestQuery;
+import org.junit.Test;
+
+public class TestOptions extends BaseTestQuery{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestOptions.class);
+
+ @Test
+ public void testDrillbits() throws Exception{
+ test("select * from sys.drillbits;");
+ }
+
+ @Test
+ public void testOptions() throws Exception{
+ test(
+ "select * from sys.options;" +
+ "ALTER SYSTEM set `planner.disable_exchanges` = true;" +
+ "select * from sys.options;" +
+ "ALTER SESSION set `planner.disable_exchanges` = true;" +
+ "select * from sys.options;"
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/test/resources/server/options_session_check.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/server/options_session_check.json b/exec/java-exec/src/test/resources/server/options_session_check.json
new file mode 100644
index 0000000..6cb80fd
--- /dev/null
+++ b/exec/java-exec/src/test/resources/server/options_session_check.json
@@ -0,0 +1,20 @@
+{
+ head:{
+ type:"APACHE_DRILL_PHYSICAL",
+ version:"1",
+ generator:{
+ type:"manual"
+ }
+ },
+ graph:[
+ {
+ @id:1,
+ pop:"options-reader-group-scan"
+ },
+ {
+ @id: 2,
+ child: 1,
+ pop: "screen"
+ }
+ ]
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/test/resources/server/options_set.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/server/options_set.json b/exec/java-exec/src/test/resources/server/options_set.json
new file mode 100644
index 0000000..dda35fc
--- /dev/null
+++ b/exec/java-exec/src/test/resources/server/options_set.json
@@ -0,0 +1,24 @@
+{
+ head:{
+ type:"APACHE_DRILL_PHYSICAL",
+ version:"1",
+ generator:{
+ type:"manual"
+ },
+ options : {
+ &REPLACED_IN_TEST&
+ }
+ },
+ graph:[
+ {
+ @id:1,
+ pop:"options-reader-group-scan"
+ },
+ {
+ @id: 2,
+ child: 1,
+ pop: "screen"
+ }
+ ]
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java b/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java
index f70ddca..8130a33 100644
--- a/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java
+++ b/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java
@@ -272,6 +272,8 @@ public class TestJdbcQuery extends JdbcTest{
"TABLE_SCHEMA=hive.default; TABLE_NAME=kv\n" +
"TABLE_SCHEMA=hive.db1; TABLE_NAME=kv_db1\n" +
"TABLE_SCHEMA=hive; TABLE_NAME=kv\n" +
+ "TABLE_SCHEMA=sys; TABLE_NAME=drillbits\n" +
+ "TABLE_SCHEMA=sys; TABLE_NAME=options\n" +
"TABLE_SCHEMA=INFORMATION_SCHEMA; TABLE_NAME=VIEWS\n" +
"TABLE_SCHEMA=INFORMATION_SCHEMA; TABLE_NAME=COLUMNS\n" +
"TABLE_SCHEMA=INFORMATION_SCHEMA; TABLE_NAME=TABLES\n" +
@@ -322,6 +324,7 @@ public class TestJdbcQuery extends JdbcTest{
"SCHEMA_NAME=dfs\n" +
"SCHEMA_NAME=cp.default\n" +
"SCHEMA_NAME=cp\n" +
+ "SCHEMA_NAME=sys\n" +
"SCHEMA_NAME=INFORMATION_SCHEMA\n";
JdbcAssert.withNoDefaultSchema().sql("SHOW DATABASES").returns(expected);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/protocol/src/main/java/org/apache/drill/exec/proto/BitControl.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/BitControl.java b/protocol/src/main/java/org/apache/drill/exec/proto/BitControl.java
index fe37521..a1cf6f8 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/BitControl.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/BitControl.java
@@ -3155,6 +3155,21 @@ public final class BitControl {
* <code>optional int32 time_zone = 16;</code>
*/
int getTimeZone();
+
+ // optional string options_json = 17;
+ /**
+ * <code>optional string options_json = 17;</code>
+ */
+ boolean hasOptionsJson();
+ /**
+ * <code>optional string options_json = 17;</code>
+ */
+ java.lang.String getOptionsJson();
+ /**
+ * <code>optional string options_json = 17;</code>
+ */
+ com.google.protobuf.ByteString
+ getOptionsJsonBytes();
}
/**
* Protobuf type {@code exec.bit.control.PlanFragment}
@@ -3309,6 +3324,11 @@ public final class BitControl {
timeZone_ = input.readInt32();
break;
}
+ case 138: {
+ bitField0_ |= 0x00004000;
+ optionsJson_ = input.readBytes();
+ break;
+ }
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -3648,6 +3668,49 @@ public final class BitControl {
return timeZone_;
}
+ // optional string options_json = 17;
+ public static final int OPTIONS_JSON_FIELD_NUMBER = 17;
+ private java.lang.Object optionsJson_;
+ /**
+ * <code>optional string options_json = 17;</code>
+ */
+ public boolean hasOptionsJson() {
+ return ((bitField0_ & 0x00004000) == 0x00004000);
+ }
+ /**
+ * <code>optional string options_json = 17;</code>
+ */
+ public java.lang.String getOptionsJson() {
+ java.lang.Object ref = optionsJson_;
+ if (ref instanceof java.lang.String) {
+ return (java.lang.String) ref;
+ } else {
+ com.google.protobuf.ByteString bs =
+ (com.google.protobuf.ByteString) ref;
+ java.lang.String s = bs.toStringUtf8();
+ if (bs.isValidUtf8()) {
+ optionsJson_ = s;
+ }
+ return s;
+ }
+ }
+ /**
+ * <code>optional string options_json = 17;</code>
+ */
+ public com.google.protobuf.ByteString
+ getOptionsJsonBytes() {
+ java.lang.Object ref = optionsJson_;
+ if (ref instanceof java.lang.String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8(
+ (java.lang.String) ref);
+ optionsJson_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+
private void initFields() {
handle_ = org.apache.drill.exec.proto.ExecProtos.FragmentHandle.getDefaultInstance();
networkCost_ = 0F;
@@ -3663,6 +3726,7 @@ public final class BitControl {
queryStartTime_ = 0L;
credentials_ = org.apache.drill.exec.proto.UserBitShared.UserCredentials.getDefaultInstance();
timeZone_ = 0;
+ optionsJson_ = "";
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -3718,6 +3782,9 @@ public final class BitControl {
if (((bitField0_ & 0x00002000) == 0x00002000)) {
output.writeInt32(16, timeZone_);
}
+ if (((bitField0_ & 0x00004000) == 0x00004000)) {
+ output.writeBytes(17, getOptionsJsonBytes());
+ }
getUnknownFields().writeTo(output);
}
@@ -3783,6 +3850,10 @@ public final class BitControl {
size += com.google.protobuf.CodedOutputStream
.computeInt32Size(16, timeZone_);
}
+ if (((bitField0_ & 0x00004000) == 0x00004000)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBytesSize(17, getOptionsJsonBytes());
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -3947,6 +4018,8 @@ public final class BitControl {
bitField0_ = (bitField0_ & ~0x00001000);
timeZone_ = 0;
bitField0_ = (bitField0_ & ~0x00002000);
+ optionsJson_ = "";
+ bitField0_ = (bitField0_ & ~0x00004000);
return this;
}
@@ -4047,6 +4120,10 @@ public final class BitControl {
to_bitField0_ |= 0x00002000;
}
result.timeZone_ = timeZone_;
+ if (((from_bitField0_ & 0x00004000) == 0x00004000)) {
+ to_bitField0_ |= 0x00004000;
+ }
+ result.optionsJson_ = optionsJson_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -4107,6 +4184,11 @@ public final class BitControl {
if (other.hasTimeZone()) {
setTimeZone(other.getTimeZone());
}
+ if (other.hasOptionsJson()) {
+ bitField0_ |= 0x00004000;
+ optionsJson_ = other.optionsJson_;
+ onChanged();
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -5021,6 +5103,80 @@ public final class BitControl {
return this;
}
+ // optional string options_json = 17;
+ private java.lang.Object optionsJson_ = "";
+ /**
+ * <code>optional string options_json = 17;</code>
+ */
+ public boolean hasOptionsJson() {
+ return ((bitField0_ & 0x00004000) == 0x00004000);
+ }
+ /**
+ * <code>optional string options_json = 17;</code>
+ */
+ public java.lang.String getOptionsJson() {
+ java.lang.Object ref = optionsJson_;
+ if (!(ref instanceof java.lang.String)) {
+ java.lang.String s = ((com.google.protobuf.ByteString) ref)
+ .toStringUtf8();
+ optionsJson_ = s;
+ return s;
+ } else {
+ return (java.lang.String) ref;
+ }
+ }
+ /**
+ * <code>optional string options_json = 17;</code>
+ */
+ public com.google.protobuf.ByteString
+ getOptionsJsonBytes() {
+ java.lang.Object ref = optionsJson_;
+ if (ref instanceof String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8(
+ (java.lang.String) ref);
+ optionsJson_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+ /**
+ * <code>optional string options_json = 17;</code>
+ */
+ public Builder setOptionsJson(
+ java.lang.String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00004000;
+ optionsJson_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional string options_json = 17;</code>
+ */
+ public Builder clearOptionsJson() {
+ bitField0_ = (bitField0_ & ~0x00004000);
+ optionsJson_ = getDefaultInstance().getOptionsJson();
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional string options_json = 17;</code>
+ */
+ public Builder setOptionsJsonBytes(
+ com.google.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00004000;
+ optionsJson_ = value;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:exec.bit.control.PlanFragment)
}
@@ -5753,7 +5909,7 @@ public final class BitControl {
"shared.DrillPBError\022\024\n\014running_time\030\t \001(" +
"\003\"k\n\rFragmentState\022\013\n\007SENDING\020\000\022\027\n\023AWAIT" +
"ING_ALLOCATION\020\001\022\013\n\007RUNNING\020\002\022\014\n\010FINISHE" +
- "D\020\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005\"\250\003\n\014PlanF" +
+ "D\020\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005\"\276\003\n\014PlanF" +
"ragment\022(\n\006handle\030\001 \001(\0132\030.exec.bit.Fragm",
"entHandle\022\024\n\014network_cost\030\004 \001(\002\022\020\n\010cpu_c" +
"ost\030\005 \001(\002\022\021\n\tdisk_cost\030\006 \001(\002\022\023\n\013memory_c" +
@@ -5764,16 +5920,17 @@ public final class BitControl {
"\003:\01020000000\022\034\n\007mem_max\030\r \001(\003:\0132000000000" +
"0\022\030\n\020query_start_time\030\016 \001(\003\0221\n\013credentia" +
"ls\030\017 \001(\0132\034.exec.shared.UserCredentials\022\021" +
- "\n\ttime_zone\030\020 \001(\005\"f\n\017WorkQueueStatus\022(\n\010",
- "endpoint\030\001 \001(\0132\026.exec.DrillbitEndpoint\022\024" +
- "\n\014queue_length\030\002 \001(\005\022\023\n\013report_time\030\003 \001(" +
- "\003*\332\001\n\007RpcType\022\r\n\tHANDSHAKE\020\000\022\007\n\003ACK\020\001\022\013\n" +
- "\007GOODBYE\020\002\022\033\n\027REQ_INIATILIZE_FRAGMENT\020\003\022" +
- "\027\n\023REQ_CANCEL_FRAGMENT\020\006\022\027\n\023REQ_FRAGMENT" +
- "_STATUS\020\007\022\022\n\016REQ_BIT_STATUS\020\010\022\030\n\024RESP_FR" +
- "AGMENT_HANDLE\020\t\022\030\n\024RESP_FRAGMENT_STATUS\020" +
- "\n\022\023\n\017RESP_BIT_STATUS\020\013B+\n\033org.apache.dri" +
- "ll.exec.protoB\nBitControlH\001"
+ "\n\ttime_zone\030\020 \001(\005\022\024\n\014options_json\030\021 \001(\t\"",
+ "f\n\017WorkQueueStatus\022(\n\010endpoint\030\001 \001(\0132\026.e" +
+ "xec.DrillbitEndpoint\022\024\n\014queue_length\030\002 \001" +
+ "(\005\022\023\n\013report_time\030\003 \001(\003*\332\001\n\007RpcType\022\r\n\tH" +
+ "ANDSHAKE\020\000\022\007\n\003ACK\020\001\022\013\n\007GOODBYE\020\002\022\033\n\027REQ_" +
+ "INIATILIZE_FRAGMENT\020\003\022\027\n\023REQ_CANCEL_FRAG" +
+ "MENT\020\006\022\027\n\023REQ_FRAGMENT_STATUS\020\007\022\022\n\016REQ_B" +
+ "IT_STATUS\020\010\022\030\n\024RESP_FRAGMENT_HANDLE\020\t\022\030\n" +
+ "\024RESP_FRAGMENT_STATUS\020\n\022\023\n\017RESP_BIT_STAT" +
+ "US\020\013B+\n\033org.apache.drill.exec.protoB\nBit" +
+ "ControlH\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -5803,7 +5960,7 @@ public final class BitControl {
internal_static_exec_bit_control_PlanFragment_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_exec_bit_control_PlanFragment_descriptor,
- new java.lang.String[] { "Handle", "NetworkCost", "CpuCost", "DiskCost", "MemoryCost", "FragmentJson", "Assignment", "LeafFragment", "Foreman", "MemInitial", "MemMax", "QueryStartTime", "Credentials", "TimeZone", });
+ new java.lang.String[] { "Handle", "NetworkCost", "CpuCost", "DiskCost", "MemoryCost", "FragmentJson", "Assignment", "LeafFragment", "Foreman", "MemInitial", "MemMax", "QueryStartTime", "Credentials", "TimeZone", "OptionsJson", });
internal_static_exec_bit_control_WorkQueueStatus_descriptor =
getDescriptor().getMessageTypes().get(4);
internal_static_exec_bit_control_WorkQueueStatus_fieldAccessorTable = new
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/protocol/src/main/protobuf/BitControl.proto
----------------------------------------------------------------------
diff --git a/protocol/src/main/protobuf/BitControl.proto b/protocol/src/main/protobuf/BitControl.proto
index a738646..77d7e9d 100644
--- a/protocol/src/main/protobuf/BitControl.proto
+++ b/protocol/src/main/protobuf/BitControl.proto
@@ -77,6 +77,7 @@ message PlanFragment {
optional int64 query_start_time = 14; // start time of query in milliseconds
optional exec.shared.UserCredentials credentials = 15;
optional int32 time_zone = 16;
+ optional string options_json = 17;
}
message WorkQueueStatus{