You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2014/05/07 03:20:59 UTC
[5/6] DRILL-381: Implement SYSTEM and SESSION options.
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java.orig
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java.orig b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java.orig
new file mode 100644
index 0000000..cdfcf60
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java.orig
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server;
+
+import io.netty.channel.nio.NioEventLoopGroup;
+
+import java.util.Collection;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.cache.DistributedCache;
+import org.apache.drill.exec.cache.DistributedMultiMap;
+import org.apache.drill.exec.coord.ClusterCoordinator;
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
+import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.rpc.control.Controller;
+import org.apache.drill.exec.rpc.control.WorkEventBus;
+import org.apache.drill.exec.rpc.data.DataConnectionCreator;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.StoragePluginRegistry.DrillSchemaFactory;
+import org.apache.drill.exec.store.StoragePlugin;
+
+import com.codahale.metrics.MetricRegistry;
+import com.google.common.base.Preconditions;
+
+public class DrillbitContext {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillbitContext.class);
+
+ private final BootStrapContext context;
+
+ private PhysicalPlanReader reader;
+ private final ClusterCoordinator coord;
+ private final DataConnectionCreator connectionsPool;
+ private final DistributedCache cache;
+ private final DrillbitEndpoint endpoint;
+ private final StoragePluginRegistry storagePlugins;
+ private final OperatorCreatorRegistry operatorCreatorRegistry;
+ private final Controller controller;
+ private final WorkEventBus workBus;
+ private final FunctionImplementationRegistry functionRegistry;
+<<<<<<< HEAD
+
+=======
+ private final DistributedGlobalOptions globalDrillOptions;
+
+>>>>>>> Drill 381 - implementation of session and global options.
+ public DrillbitContext(DrillbitEndpoint endpoint, BootStrapContext context, ClusterCoordinator coord, Controller controller, DataConnectionCreator connectionsPool, DistributedCache cache, WorkEventBus workBus) {
+ super();
+ Preconditions.checkNotNull(endpoint);
+ Preconditions.checkNotNull(context);
+ Preconditions.checkNotNull(controller);
+ Preconditions.checkNotNull(connectionsPool);
+ this.workBus = workBus;
+ this.controller = controller;
+ this.context = context;
+ this.coord = coord;
+ this.connectionsPool = connectionsPool;
+ this.cache = cache;
+ this.endpoint = endpoint;
+ this.storagePlugins = new StoragePluginRegistry(this);
+ this.reader = new PhysicalPlanReader(context.getConfig(), context.getConfig().getMapper(), endpoint, storagePlugins);
+ this.operatorCreatorRegistry = new OperatorCreatorRegistry(context.getConfig());
+ this.functionRegistry = new FunctionImplementationRegistry(context.getConfig());
+ this.globalDrillOptions = new DistributedGlobalOptions(this.cache);
+ }
+
+<<<<<<< HEAD
+=======
+
+>>>>>>> Drill 381 - implementation of session and global options.
+ public FunctionImplementationRegistry getFunctionImplementationRegistry() {
+ return functionRegistry;
+ }
+
+ public WorkEventBus getWorkBus(){
+ return workBus;
+ }
+<<<<<<< HEAD
+=======
+
+ public DistributedGlobalOptions getGlobalDrillOptions() {
+ return globalDrillOptions;
+ }
+>>>>>>> Drill 381 - implementation of session and global options.
+
+ public DrillbitEndpoint getEndpoint(){
+ return endpoint;
+ }
+
+ public DrillConfig getConfig() {
+ return context.getConfig();
+ }
+
+ public Collection<DrillbitEndpoint> getBits(){
+ return coord.getAvailableEndpoints();
+ }
+
+ public BufferAllocator getAllocator(){
+ return context.getAllocator();
+ }
+
+ public OperatorCreatorRegistry getOperatorCreatorRegistry() {
+ return operatorCreatorRegistry;
+ }
+
+ public StoragePluginRegistry getStorage(){
+ return this.storagePlugins;
+ }
+
+ public NioEventLoopGroup getBitLoopGroup(){
+ return context.getBitLoopGroup();
+ }
+
+
+ public DataConnectionCreator getDataConnectionsPool(){
+ return connectionsPool;
+ }
+
+ public Controller getController(){
+ return controller;
+ }
+
+ public MetricRegistry getMetrics(){
+ return context.getMetrics();
+ }
+
+ public DistributedCache getCache(){
+ return cache;
+ }
+
+ public PhysicalPlanReader getPlanReader(){
+ return reader;
+ }
+
+ public DrillSchemaFactory getSchemaFactory(){
+ return storagePlugins.getSchemaFactory();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/DrillConfigIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/DrillConfigIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/DrillConfigIterator.java
new file mode 100644
index 0000000..e4b03d3
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/DrillConfigIterator.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.options;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.server.options.OptionValue.Kind;
+import org.apache.drill.exec.server.options.OptionValue.OptionType;
+
+import com.typesafe.config.ConfigValue;
+
+public class DrillConfigIterator implements Iterable<OptionValue> {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillConfigIterator.class);
+
+ DrillConfig c;
+ public DrillConfigIterator(DrillConfig c){
+ this.c = c;
+ }
+
+ @Override
+ public Iterator<OptionValue> iterator() {
+ return new Iter(c);
+ }
+
+ public class Iter implements Iterator<OptionValue>{
+
+ Iterator<Entry<String, ConfigValue>> entries;
+ public Iter(DrillConfig c){
+ entries = c.entrySet().iterator();
+ }
+ @Override
+ public boolean hasNext() {
+ return entries.hasNext();
+ }
+
+ @Override
+ public OptionValue next() {
+ Entry<String, ConfigValue> e = entries.next();
+ OptionValue v = new OptionValue();
+ v.name = e.getKey();
+ ConfigValue cv = e.getValue();
+ v.type = OptionType.BOOT;
+ switch(cv.valueType()){
+ case BOOLEAN:
+ v.kind = Kind.BOOLEAN;
+ v.bool_val = (Boolean) cv.unwrapped();
+ break;
+ case LIST:
+ case OBJECT:
+ case STRING:
+ v.string_val = cv.render();
+ break;
+ case NUMBER:
+ v.kind = Kind.LONG;
+ v.num_val = ((Number)cv.unwrapped()).longValue();
+ break;
+ }
+ return v;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FragmentOptionsManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FragmentOptionsManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FragmentOptionsManager.java
new file mode 100644
index 0000000..e9620db
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FragmentOptionsManager.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.options;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.eigenbase.sql.SqlLiteral;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+
+public class FragmentOptionsManager implements OptionManager{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentOptionsManager.class);
+
+ ImmutableMap<String, OptionValue> options;
+ OptionManager systemOptions;
+
+ public FragmentOptionsManager(OptionManager systemOptions, OptionList options){
+ Map<String, OptionValue> tmp = Maps.newHashMap();
+ for(OptionValue v : options){
+ tmp.put(v.name, v);
+ }
+ this.options = ImmutableMap.copyOf(tmp);
+ this.systemOptions = systemOptions;
+ }
+
+ @Override
+ public Iterator<OptionValue> iterator() {
+ return Iterables.concat(systemOptions, options.values()).iterator();
+ }
+
+ @Override
+ public OptionValue getOption(String name) {
+ return null;
+ }
+
+ @Override
+ public void setOption(OptionValue value) throws SetOptionException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setOption(String name, SqlLiteral literal) throws SetOptionException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public OptionAdmin getAdmin() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public OptionManager getSystemManager() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public OptionList getSessionOptionList() {
+ throw new UnsupportedOperationException();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionList.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionList.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionList.java
new file mode 100644
index 0000000..b6d4f8c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionList.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.options;
+
+import java.util.ArrayList;
+
+public class OptionList extends ArrayList<OptionValue>{
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionManager.java
new file mode 100644
index 0000000..3833833
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionManager.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.options;
+
+import org.eigenbase.sql.SqlLiteral;
+
+public interface OptionManager extends Iterable<OptionValue>{
+ public OptionValue getOption(String name);
+ public void setOption(OptionValue value) throws SetOptionException;
+ public void setOption(String name, SqlLiteral literal) throws SetOptionException;
+ public OptionAdmin getAdmin();
+ public OptionManager getSystemManager();
+ public OptionList getSessionOptionList();
+
+ public interface OptionAdmin{
+ public void registerOptionType(OptionValidator validator);
+ public void validate(OptionValue v) throws SetOptionException;
+ public OptionValue validate(String name, SqlLiteral value) throws SetOptionException;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValidator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValidator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValidator.java
new file mode 100644
index 0000000..5b90ba5
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValidator.java
@@ -0,0 +1,65 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.server.options;
+
+
+import org.apache.drill.common.exceptions.ExpressionParsingException;
+import org.eigenbase.sql.SqlLiteral;
+
+/**
+ * Validates the values provided to Drill options.
+ *
+ * @param <E>
+ */
+public abstract class OptionValidator {
+
+ // Stored here as well as in the option static class to allow insertion of option optionName into
+ // the error messages produced by the validator
+ private String optionName;
+
+ public OptionValidator(String optionName){
+ this.optionName = optionName;
+ }
+
+ /**
+ * This method determines if a given value is a valid setting for an option. For options that support some
+ * ambiguity in their settings, such as case-insensitivity for string options, this method returns a modified
+ * version of the passed value that is considered the standard format of the option that should be used for
+ * system-internal representation.
+ *
+ * @param value - the value to validate
+ * @return - the value requested, in its standard format to be used for representing the value within Drill
+ * Example: all lower case values for strings, to avoid ambiguities in how values are stored
+ * while allowing some flexibility for users
+ * @throws ExpressionParsingException - message to describe error with value, including range or list of expected values
+ */
+ public abstract OptionValue validate(SqlLiteral value) throws ExpressionParsingException;
+
+ public String getOptionName() {
+ return optionName;
+ }
+
+ public String getDefaultString(){
+ return null;
+ }
+
+
+ public abstract OptionValue getDefault();
+
+ public abstract void validate(OptionValue v) throws ExpressionParsingException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java
new file mode 100644
index 0000000..7b4f7f6
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.options;
+
+import org.apache.drill.exec.cache.JacksonSerializable;
+
+import com.google.common.base.Preconditions;
+
+
+public class OptionValue extends JacksonSerializable {
+
+ public static enum OptionType {
+ BOOT, SYSTEM, SESSION
+ }
+
+ public static enum Kind {
+ BOOLEAN, LONG, STRING, DOUBLE
+ }
+
+ public String name;
+ public Kind kind;
+ public OptionType type;
+ public Long num_val;
+ public String string_val;
+ public Boolean bool_val;
+ public Double float_val;
+
+ public static OptionValue createLong(OptionType type, String name, long val) {
+ return new OptionValue(Kind.LONG, type, name, val, null, null, null);
+ }
+
+ public static OptionValue createBoolean(OptionType type, String name, boolean bool) {
+ return new OptionValue(Kind.BOOLEAN, type, name, null, null, bool, null);
+ }
+
+ public static OptionValue createString(OptionType type, String name, String val) {
+ return new OptionValue(Kind.STRING, type, name, null, val, null, null);
+ }
+
+ public static OptionValue createDouble(OptionType type, String name, double val) {
+ return new OptionValue(Kind.DOUBLE, type, name, null, null, null, val);
+ }
+
+ public OptionValue(){}
+
+ private OptionValue(Kind kind, OptionType type, String name, Long num_val, String string_val, Boolean bool_val, Double float_val) {
+ super();
+ Preconditions.checkArgument(num_val != null || string_val != null || bool_val != null);
+ this.name = name;
+ this.kind = kind;
+ this.float_val = float_val;
+ this.type = type;
+ this.num_val = num_val;
+ this.string_val = string_val;
+ this.bool_val = bool_val;
+ this.type = type;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((bool_val == null) ? 0 : bool_val.hashCode());
+ result = prime * result + ((float_val == null) ? 0 : float_val.hashCode());
+ result = prime * result + ((kind == null) ? 0 : kind.hashCode());
+ result = prime * result + ((name == null) ? 0 : name.hashCode());
+ result = prime * result + ((num_val == null) ? 0 : num_val.hashCode());
+ result = prime * result + ((string_val == null) ? 0 : string_val.hashCode());
+ result = prime * result + ((type == null) ? 0 : type.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ OptionValue other = (OptionValue) obj;
+ if (bool_val == null) {
+ if (other.bool_val != null)
+ return false;
+ } else if (!bool_val.equals(other.bool_val))
+ return false;
+ if (float_val == null) {
+ if (other.float_val != null)
+ return false;
+ } else if (!float_val.equals(other.float_val))
+ return false;
+ if (kind != other.kind)
+ return false;
+ if (name == null) {
+ if (other.name != null)
+ return false;
+ } else if (!name.equals(other.name))
+ return false;
+ if (num_val == null) {
+ if (other.num_val != null)
+ return false;
+ } else if (!num_val.equals(other.num_val))
+ return false;
+ if (string_val == null) {
+ if (other.string_val != null)
+ return false;
+ } else if (!string_val.equals(other.string_val))
+ return false;
+ if (type != other.type)
+ return false;
+ return true;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java
new file mode 100644
index 0000000..993cead
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.options;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.drill.exec.server.options.OptionValue.OptionType;
+import org.eigenbase.sql.SqlLiteral;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+
+public class SessionOptionManager implements OptionManager{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SessionOptionManager.class);
+
+ private Map<String, OptionValue> options = Maps.newConcurrentMap();
+ private OptionManager systemOptions;
+
+ public SessionOptionManager(OptionManager systemOptions) {
+ super();
+ this.systemOptions = systemOptions;
+ }
+
+ @Override
+ public Iterator<OptionValue> iterator() {
+ return Iterables.concat(systemOptions, options.values()).iterator();
+ }
+
+ @Override
+ public OptionValue getOption(String name) {
+ OptionValue opt = options.get(name);
+ if(opt == null){
+ return systemOptions.getOption(name);
+ }else{
+ return opt;
+ }
+ }
+
+ @Override
+ public void setOption(OptionValue value) {
+ systemOptions.getAdmin().validate(value);
+ setValidatedOption(value);
+ }
+
+
+ @Override
+ public OptionList getSessionOptionList() {
+ OptionList list = new OptionList();
+ for(OptionValue o : options.values()){
+ list.add(o);
+ }
+ return list;
+ }
+
+ private void setValidatedOption(OptionValue value){
+ if(value.type == OptionType.SYSTEM){
+ systemOptions.setOption(value);
+ }else{
+ options.put(value.name, value);
+ }
+ }
+
+ @Override
+ public void setOption(String name, SqlLiteral literal) {
+ OptionValue val = systemOptions.getAdmin().validate(name, literal);
+ val.type = OptionValue.OptionType.SESSION;
+ setValidatedOption(val);
+ }
+
+ @Override
+ public OptionAdmin getAdmin() {
+ return systemOptions.getAdmin();
+ }
+
+ @Override
+ public OptionManager getSystemManager() {
+ return systemOptions;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SetOptionException.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SetOptionException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SetOptionException.java
new file mode 100644
index 0000000..dd698c3
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SetOptionException.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.options;
+
+import java.util.Set;
+
+import javax.validation.ConstraintViolation;
+
+import org.apache.drill.common.exceptions.LogicalPlanParsingException;
+import org.apache.drill.common.logical.data.LogicalOperator;
+import org.apache.drill.common.logical.data.LogicalOperatorBase;
+
+public class SetOptionException extends LogicalPlanParsingException{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SetOptionException.class);
+
+ public SetOptionException() {
+ super();
+
+ }
+
+ public SetOptionException(LogicalOperator operator, Set<ConstraintViolation<LogicalOperatorBase>> violations) {
+ super(operator, violations);
+
+ }
+
+ public SetOptionException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+
+ }
+
+ public SetOptionException(String message, Throwable cause) {
+ super(message, cause);
+
+ }
+
+ public SetOptionException(String message) {
+ super(message);
+
+ }
+
+ public SetOptionException(Throwable cause) {
+ super(cause);
+
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
new file mode 100644
index 0000000..98975e4
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.options;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.drill.exec.cache.DistributedCache;
+import org.apache.drill.exec.cache.DistributedMap;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.exec.server.options.OptionValue.OptionType;
+import org.eigenbase.sql.SqlLiteral;
+
+import com.google.common.collect.Maps;
+
+public class SystemOptionManager implements OptionManager{
+
+ private final OptionValidator[] VALIDATORS = {
+ PlannerSettings.EXCHANGE
+ };
+
+ private DistributedMap<OptionValue> options;
+ private SystemOptionAdmin admin;
+ private final ConcurrentMap<String, OptionValidator> knownOptions = Maps.newConcurrentMap();
+ private DistributedCache cache;
+
+ public SystemOptionManager(DistributedCache cache){
+ this.cache = cache;
+ }
+
+ public void init(){
+ this.options = cache.getNamedMap("system.options", OptionValue.class);
+ this.admin = new SystemOptionAdmin();
+ }
+
+ private class Iter implements Iterator<OptionValue>{
+ private Iterator<Map.Entry<String, OptionValue>> inner;
+
+ public Iter(Iterator<Map.Entry<String, OptionValue>> inner){
+ this.inner = inner;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return inner.hasNext();
+ }
+
+ @Override
+ public OptionValue next() {
+ return inner.next().getValue();
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ }
+ @Override
+ public Iterator<OptionValue> iterator() {
+ return new Iter(options.iterator());
+ }
+
+ @Override
+ public OptionValue getOption(String name) {
+ return options.get(name);
+ }
+
+ @Override
+ public void setOption(OptionValue value) {
+ admin.validate(value);
+ assert value.type == OptionType.SYSTEM;
+ options.put(value.name, value);
+ }
+
+ @Override
+ public void setOption(String name, SqlLiteral literal) {
+ OptionValue v = admin.validate(name, literal);
+ v.type = OptionValue.OptionType.SYSTEM;
+ options.put(name, v);
+ }
+
+ @Override
+ public OptionList getSessionOptionList() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public OptionManager getSystemManager() {
+ return this;
+ }
+
+ @Override
+ public OptionAdmin getAdmin() {
+ return admin;
+ }
+
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SystemOptionManager.class);
+
+
+ private class SystemOptionAdmin implements OptionAdmin{
+
+ public SystemOptionAdmin(){
+ for(OptionValidator v : VALIDATORS){
+ knownOptions.put(v.getOptionName(), v);
+ options.putIfAbsent(v.getOptionName(), v.getDefault());
+ }
+ }
+
+
+ @Override
+ public void registerOptionType(OptionValidator validator) {
+ if(null != knownOptions.putIfAbsent(validator.getOptionName(), validator) ){
+ throw new IllegalArgumentException("Only one option is allowed to be registered with name: " + validator.getOptionName());
+ }
+ }
+
+ @Override
+ public void validate(OptionValue v) throws SetOptionException {
+ OptionValidator validator = knownOptions.get(v.name);
+ if(validator == null) throw new SetOptionException("Unknown option " + v.name);
+ validator.validate(v);
+ }
+
+ @Override
+ public OptionValue validate(String name, SqlLiteral value) throws SetOptionException {
+ OptionValidator validator = knownOptions.get(name);
+ if(validator == null) throw new SetOptionException("Unknown option " + name);
+ return validator.validate(value);
+ }
+
+
+
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java
new file mode 100644
index 0000000..0d681ea
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.options;
+
+import java.math.BigDecimal;
+
+import org.apache.drill.common.exceptions.ExpressionParsingException;
+import org.apache.drill.exec.server.options.OptionValue.Kind;
+import org.apache.drill.exec.server.options.OptionValue.OptionType;
+import org.eigenbase.sql.SqlLiteral;
+
+public class TypeValidators {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TypeValidators.class);
+
+ public static class PositiveLongValidator extends LongValidator {
+
+ private final long max;
+
+ public PositiveLongValidator(String name, long max, long def) {
+ super(name, def);
+ this.max = max;
+ }
+
+ @Override
+ public void extraValidate(OptionValue v) throws ExpressionParsingException {
+ if (v.num_val > max || v.num_val < 0)
+ throw new ExpressionParsingException(String.format("Option %s must be between %d and %d.", getOptionName(), 0,
+ max));
+ }
+ }
+
+ public static class BooleanValidator extends TypeValidator{
+ public BooleanValidator(String name, boolean def){
+ super(name, Kind.BOOLEAN, OptionValue.createBoolean(OptionType.SYSTEM, name, def));
+ }
+ }
+ public static class StringValidator extends TypeValidator{
+ public StringValidator(String name, String def){
+ super(name, Kind.LONG, OptionValue.createString(OptionType.SYSTEM, name, def));
+ }
+
+ }
+ public static class LongValidator extends TypeValidator{
+ public LongValidator(String name, long def){
+ super(name, Kind.LONG, OptionValue.createLong(OptionType.SYSTEM, name, def));
+ }
+ }
+ public static class DoubleValidator extends TypeValidator{
+
+ public DoubleValidator(String name, double def){
+ super(name, Kind.DOUBLE, OptionValue.createDouble(OptionType.SYSTEM, name, def));
+ }
+
+
+ }
+
+ public static abstract class TypeValidator extends OptionValidator {
+ final Kind kind;
+ private OptionValue defaultValue;
+
+ public TypeValidator(String name, Kind kind, OptionValue defValue) {
+ super(name);
+ this.kind = kind;
+ this.defaultValue = defValue;
+ }
+
+ @Override
+ public OptionValue getDefault() {
+ return defaultValue;
+ }
+
+ @Override
+ public OptionValue validate(SqlLiteral value) throws ExpressionParsingException {
+ OptionValue op = getPartialValue(getOptionName(), (OptionType) null, value);
+ validate(op);
+ return op;
+ }
+
+ @Override
+ public final void validate(OptionValue v) throws ExpressionParsingException {
+ if (v.kind != kind)
+ throw new ExpressionParsingException(String.format("Option %s must be of type %s but you tried to set to %s.",
+ getOptionName(), kind.name(), v.kind.name()));
+ }
+
+ public void extraValidate(OptionValue v) throws ExpressionParsingException {
+ }
+
+ }
+
+ public static OptionValue getPartialValue(String name, OptionType type, SqlLiteral literal) {
+ switch (literal.getTypeName()) {
+ case DECIMAL:
+ case DOUBLE:
+ case FLOAT:
+ return OptionValue.createDouble(type, name, ((BigDecimal) literal.getValue()).doubleValue());
+
+ case SMALLINT:
+ case TINYINT:
+ case BIGINT:
+ case INTEGER:
+ return OptionValue.createLong(type, name, ((BigDecimal) literal.getValue()).longValue());
+
+ case VARBINARY:
+ case VARCHAR:
+ case CHAR:
+ return OptionValue.createString(type, name, (String) literal.getValue());
+
+ case BOOLEAN:
+ return OptionValue.createBoolean(type, name, (Boolean) literal.getValue());
+
+ }
+
+ throw new ExpressionParsingException(String.format(
+ "Drill doesn't support set option expressions with literals of type %s.", literal.getTypeName()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
index 4d88686..948c74f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
@@ -28,7 +28,6 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import com.google.common.base.Preconditions;
import net.hydromatic.linq4j.expressions.DefaultExpression;
import net.hydromatic.linq4j.expressions.Expression;
import net.hydromatic.optiq.SchemaPlus;
@@ -40,10 +39,9 @@ import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.common.util.PathScanner;
import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.planner.logical.DrillRuleSets;
import org.apache.drill.exec.cache.DistributedMap;
-import org.apache.drill.exec.cache.JacksonDrillSerializable.StoragePluginsSerializable;
import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.planner.logical.DrillRuleSets;
import org.apache.drill.exec.planner.logical.StoragePlugins;
import org.apache.drill.exec.rpc.user.DrillUser;
import org.apache.drill.exec.server.DrillbitContext;
@@ -51,9 +49,12 @@ import org.apache.drill.exec.store.dfs.FileSystemPlugin;
import org.apache.drill.exec.store.dfs.FormatPlugin;
import org.apache.drill.exec.store.ischema.InfoSchemaConfig;
import org.apache.drill.exec.store.ischema.InfoSchemaStoragePlugin;
+import org.apache.drill.exec.store.sys.SystemTablePlugin;
+import org.apache.drill.exec.store.sys.SystemTablePluginConfig;
import org.eigenbase.relopt.RelOptRule;
import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSet.Builder;
@@ -91,9 +92,9 @@ public class StoragePluginRegistry implements Iterable<Map.Entry<String, Storage
int i =0;
for(Constructor<?> c : plugin.getConstructors()){
Class<?>[] params = c.getParameterTypes();
- if(params.length != 3
- || params[1] != DrillbitContext.class
- || !StoragePluginConfig.class.isAssignableFrom(params[0])
+ if(params.length != 3
+ || params[1] != DrillbitContext.class
+ || !StoragePluginConfig.class.isAssignableFrom(params[0])
|| params[2] != String.class){
logger.info("Skipping StoragePlugin constructor {} for plugin class {} since it doesn't implement a [constructor(StoragePluginConfig, DrillbitContext, String)]", c, plugin);
continue;
@@ -109,7 +110,7 @@ public class StoragePluginRegistry implements Iterable<Map.Entry<String, Storage
// create registered plugins defined in "storage-plugins.json"
this.plugins = ImmutableMap.copyOf(createPlugins());
- // query registered engines for optimizer rules and build the storage plugin RuleSet
+ // query registered engines for optimizer rules and build the storage plugin RuleSet
Builder<RelOptRule> setBuilder = ImmutableSet.builder();
for (StoragePlugin plugin : this.plugins.values()) {
Set<StoragePluginOptimizerRule> rules = plugin.getOptimizerRules();
@@ -135,17 +136,15 @@ public class StoragePluginRegistry implements Iterable<Map.Entry<String, Storage
String pluginsData = Resources.toString(url, Charsets.UTF_8);
plugins = context.getConfig().getMapper().readValue(pluginsData, StoragePlugins.class);
}
- DistributedMap<StoragePluginsSerializable> map = context.getCache().getMap(StoragePluginsSerializable.class);
- StoragePluginsSerializable cachedPluginsSerializable = map.get("storage-plugins");
- if (cachedPluginsSerializable != null) {
- cachedPlugins = cachedPluginsSerializable.getObj();
+ DistributedMap<StoragePlugins> map = context.getCache().getMap(StoragePlugins.class);
+ cachedPlugins = map.get("storage-plugins");
+ if (cachedPlugins != null) {
logger.debug("Found cached storage plugin config: {}", cachedPlugins);
} else {
Preconditions.checkNotNull(plugins,"No storage plugin configuration found");
logger.debug("caching storage plugin config {}", plugins);
- map.put("storage-plugins", new StoragePluginsSerializable(context, plugins));
- cachedPluginsSerializable = map.get("storage-plugins");
- cachedPlugins = cachedPluginsSerializable.getObj();
+ map.put("storage-plugins", plugins);
+ cachedPlugins = map.get("storage-plugins");
}
if(!(plugins == null || cachedPlugins.equals(plugins))) {
logger.error("Storage plugin config mismatch. {}. {}", plugins, cachedPlugins);
@@ -155,7 +154,7 @@ public class StoragePluginRegistry implements Iterable<Map.Entry<String, Storage
}catch(IOException e){
throw new IllegalStateException("Failure while reading storage plugins data.", e);
}
-
+
for(Map.Entry<String, StoragePluginConfig> config : cachedPlugins){
try{
StoragePlugin plugin = create(config.getKey(), config.getValue());
@@ -165,14 +164,15 @@ public class StoragePluginRegistry implements Iterable<Map.Entry<String, Storage
}
}
activePlugins.put("INFORMATION_SCHEMA", new InfoSchemaStoragePlugin(new InfoSchemaConfig(), context, "INFORMATION_SCHEMA"));
-
+ activePlugins.put("sys", new SystemTablePlugin(SystemTablePluginConfig.INSTANCE, context, "sys"));
+
return activePlugins;
}
public StoragePlugin getPlugin(String registeredStoragePluginName) throws ExecutionSetupException {
return plugins.get(registeredStoragePluginName);
}
-
+
public StoragePlugin getPlugin(StoragePluginConfig config) throws ExecutionSetupException {
if(config instanceof NamedStoragePluginConfig){
return plugins.get(((NamedStoragePluginConfig) config).name);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java.orig
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java.orig b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java.orig
new file mode 100644
index 0000000..2c75651
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java.orig
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.net.URL;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import com.google.common.base.Preconditions;
+import net.hydromatic.linq4j.expressions.DefaultExpression;
+import net.hydromatic.linq4j.expressions.Expression;
+import net.hydromatic.optiq.SchemaPlus;
+import net.hydromatic.optiq.tools.RuleSet;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.util.PathScanner;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.planner.logical.DrillRuleSets;
+import org.apache.drill.exec.cache.DistributedMap;
+import org.apache.drill.exec.cache.JacksonDrillSerializable.StoragePluginsSerializable;
+import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.planner.logical.StoragePlugins;
+import org.apache.drill.exec.rpc.user.DrillUser;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.dfs.FileSystemPlugin;
+import org.apache.drill.exec.store.dfs.FormatPlugin;
+import org.apache.drill.exec.store.ischema.InfoSchemaConfig;
+import org.apache.drill.exec.store.ischema.InfoSchemaStoragePlugin;
+import org.eigenbase.relopt.RelOptRule;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableSet.Builder;
+import com.google.common.io.Resources;
+import org.apache.drill.exec.store.options.OptionValueStorageConfig;
+import org.apache.drill.exec.store.options.OptionValueStoragePlugin;
+
+
+public class StoragePluginRegistry implements Iterable<Map.Entry<String, StoragePlugin>>{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StoragePluginRegistry.class);
+
+ private Map<Object, Constructor<? extends StoragePlugin>> availablePlugins = new HashMap<Object, Constructor<? extends StoragePlugin>>();
+ private ImmutableMap<String, StoragePlugin> plugins;
+
+ private DrillbitContext context;
+ private final DrillSchemaFactory schemaFactory = new DrillSchemaFactory();
+
+ private RuleSet storagePluginsRuleSet;
+
+ private static final Expression EXPRESSION = new DefaultExpression(Object.class);
+
+ public StoragePluginRegistry(DrillbitContext context) {
+ try{
+ this.context = context;
+ }catch(RuntimeException e){
+ logger.error("Failure while loading storage plugin registry.", e);
+ throw new RuntimeException("Faiure while reading and loading storage plugin configuration.", e);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public void init() throws DrillbitStartupException {
+ DrillConfig config = context.getConfig();
+ Collection<Class<? extends StoragePlugin>> plugins = PathScanner.scanForImplementations(StoragePlugin.class, config.getStringList(ExecConstants.STORAGE_ENGINE_SCAN_PACKAGES));
+ logger.debug("Loading storage plugins {}", plugins);
+ for(Class<? extends StoragePlugin> plugin: plugins){
+ int i =0;
+ for(Constructor<?> c : plugin.getConstructors()){
+ Class<?>[] params = c.getParameterTypes();
+ if(params.length != 3
+ || params[1] != DrillbitContext.class
+ || !StoragePluginConfig.class.isAssignableFrom(params[0])
+ || params[2] != String.class){
+ logger.info("Skipping StoragePlugin constructor {} for plugin class {} since it doesn't implement a [constructor(StoragePluginConfig, DrillbitContext, String)]", c, plugin);
+ continue;
+ }
+ availablePlugins.put(params[0], (Constructor<? extends StoragePlugin>) c);
+ i++;
+ }
+ if(i == 0){
+ logger.debug("Skipping registration of StoragePlugin {} as it doesn't have a constructor with the parameters of (StorangePluginConfig, Config)", plugin.getCanonicalName());
+ }
+ }
+
+ // create registered plugins defined in "storage-plugins.json"
+ this.plugins = ImmutableMap.copyOf(createPlugins());
+
+ // query registered engines for optimizer rules and build the storage plugin RuleSet
+ Builder<RelOptRule> setBuilder = ImmutableSet.builder();
+ for (StoragePlugin plugin : this.plugins.values()) {
+ Set<StoragePluginOptimizerRule> rules = plugin.getOptimizerRules();
+ if (rules != null && rules.size() > 0) {
+ setBuilder.addAll(rules);
+ }
+ }
+ this.storagePluginsRuleSet = DrillRuleSets.create(setBuilder.build());
+ }
+
+ private Map<String, StoragePlugin> createPlugins() throws DrillbitStartupException {
+ /*
+ * Check if "storage-plugins.json" exists. Also check if "storage-plugins" object exists in Distributed Cache.
+ * If both exist, check that they are the same. If they differ, throw exception. If "storage-plugins.json" exists, but
+ * nothing found in cache, then add it to the cache. If neither are found, throw exception.
+ */
+ StoragePlugins plugins = null;
+ StoragePlugins cachedPlugins = null;
+ Map<String, StoragePlugin> activePlugins = new HashMap<String, StoragePlugin>();
+ try{
+ URL url = Resources.class.getClassLoader().getResource("storage-plugins.json");
+ if (url != null) {
+ String pluginsData = Resources.toString(url, Charsets.UTF_8);
+ plugins = context.getConfig().getMapper().readValue(pluginsData, StoragePlugins.class);
+ }
+ DistributedMap<StoragePluginsSerializable> map = context.getCache().getMap(StoragePluginsSerializable.class);
+ StoragePluginsSerializable cachedPluginsSerializable = map.get("storage-plugins");
+ if (cachedPluginsSerializable != null) {
+ cachedPlugins = cachedPluginsSerializable.getObj();
+ logger.debug("Found cached storage plugin config: {}", cachedPlugins);
+ } else {
+ Preconditions.checkNotNull(plugins,"No storage plugin configuration found");
+ logger.debug("caching storage plugin config {}", plugins);
+ map.put("storage-plugins", new StoragePluginsSerializable(context, plugins));
+ cachedPluginsSerializable = map.get("storage-plugins");
+ cachedPlugins = cachedPluginsSerializable.getObj();
+ }
+ if(!(plugins == null || cachedPlugins.equals(plugins))) {
+ logger.error("Storage plugin config mismatch. {}. {}", plugins, cachedPlugins);
+ throw new DrillbitStartupException("Storage plugin config mismatch");
+ }
+ logger.debug("using plugin config: {}", cachedPlugins);
+ }catch(IOException e){
+ throw new IllegalStateException("Failure while reading storage plugins data.", e);
+ }
+
+ for(Map.Entry<String, StoragePluginConfig> config : cachedPlugins){
+ try{
+ StoragePlugin plugin = create(config.getKey(), config.getValue());
+ activePlugins.put(config.getKey(), plugin);
+ }catch(ExecutionSetupException e){
+ logger.error("Failure while setting up StoragePlugin with name: '{}'.", config.getKey(), e);
+ }
+ }
+<<<<<<< HEAD
+ activePlugins.put("INFORMATION_SCHEMA", new InfoSchemaStoragePlugin(new InfoSchemaConfig(), context, "INFORMATION_SCHEMA"));
+
+ return activePlugins;
+=======
+ activeEngines.put("INFORMATION_SCHEMA", new InfoSchemaStoragePlugin(new InfoSchemaConfig(), context, "INFORMATION_SCHEMA"));
+ activeEngines.put("OPTIONS", new OptionValueStoragePlugin(new OptionValueStorageConfig(), context, "OPTIONS"));
+
+ return activeEngines;
+>>>>>>> Drill 381 - implementation of session and global options.
+ }
+
+ public StoragePlugin getPlugin(String registeredStoragePluginName) throws ExecutionSetupException {
+ return plugins.get(registeredStoragePluginName);
+ }
+
+ public StoragePlugin getPlugin(StoragePluginConfig config) throws ExecutionSetupException {
+ if(config instanceof NamedStoragePluginConfig){
+ return plugins.get(((NamedStoragePluginConfig) config).name);
+ }else{
+ // TODO: for now, we'll throw away transient configs. we really ought to clean these up.
+ return create(null, config);
+ }
+ }
+
+ public FormatPlugin getFormatPlugin(StoragePluginConfig storageConfig, FormatPluginConfig formatConfig) throws ExecutionSetupException{
+ StoragePlugin p = getPlugin(storageConfig);
+ if(!(p instanceof FileSystemPlugin)) throw new ExecutionSetupException(String.format("You tried to request a format plugin for a storage plugin that wasn't of type FileSystemPlugin. The actual type of plugin was %s.", p.getClass().getName()));
+ FileSystemPlugin storage = (FileSystemPlugin) p;
+ return storage.getFormatPlugin(formatConfig);
+ }
+
+ private StoragePlugin create(String name, StoragePluginConfig pluginConfig) throws ExecutionSetupException {
+ StoragePlugin plugin = null;
+ Constructor<? extends StoragePlugin> c = availablePlugins.get(pluginConfig.getClass());
+ if (c == null)
+ throw new ExecutionSetupException(String.format("Failure finding StoragePlugin constructor for config %s",
+ pluginConfig));
+ try {
+ plugin = c.newInstance(pluginConfig, context, name);
+ return plugin;
+ } catch (InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
+ Throwable t = e instanceof InvocationTargetException ? ((InvocationTargetException) e).getTargetException() : e;
+ if (t instanceof ExecutionSetupException)
+ throw ((ExecutionSetupException) t);
+ throw new ExecutionSetupException(String.format(
+ "Failure setting up new storage plugin configuration for config %s", pluginConfig), t);
+ }
+ }
+
+ @Override
+ public Iterator<Entry<String, StoragePlugin>> iterator() {
+ return plugins.entrySet().iterator();
+ }
+
+ public RuleSet getStoragePluginRuleSet() {
+ return storagePluginsRuleSet;
+ }
+
+ public DrillSchemaFactory getSchemaFactory(){
+ return schemaFactory;
+ }
+
+ public class DrillSchemaFactory implements SchemaFactory{
+
+ @Override
+ public void registerSchemas(DrillUser user, SchemaPlus parent) {
+ for(Map.Entry<String, StoragePlugin> e : plugins.entrySet()){
+ e.getValue().registerSchemas(user, parent);
+ }
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java
index 256b6b6..eed4f03 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java
@@ -26,6 +26,7 @@ import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.physical.EndpointAffinity;
import org.apache.drill.exec.physical.OperatorCost;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.Size;
import org.apache.drill.exec.physical.base.SubScan;
@@ -83,4 +84,10 @@ public class DirectGroupScan extends AbstractGroupScan{
public String getDigest() {
return String.valueOf(reader);
}
+
+ @Override
+ public GroupScan clone(List<SchemaPath> columns) {
+ return this;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java
index 8ae5116..84b6690 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java
@@ -22,6 +22,7 @@ import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
+import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.common.types.TypeProtos.MinorType;
@@ -29,6 +30,7 @@ import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.physical.EndpointAffinity;
import org.apache.drill.exec.physical.OperatorCost;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.Size;
import org.apache.drill.exec.physical.base.SubScan;
@@ -74,13 +76,13 @@ public class MockGroupScanPOP extends AbstractGroupScan {
public List<MockScanEntry> getReadEntries() {
return readEntries;
}
-
+
public static class MockScanEntry{
private final int records;
private final MockColumn[] types;
private final int recordSize;
-
+
@JsonCreator
public MockScanEntry(@JsonProperty("records") int records, @JsonProperty("types") MockColumn[] types) {
@@ -97,7 +99,7 @@ public class MockGroupScanPOP extends AbstractGroupScan {
public OperatorCost getCost() {
return new OperatorCost(1, 2, 1, 1);
}
-
+
public int getRecords() {
return records;
}
@@ -116,7 +118,7 @@ public class MockGroupScanPOP extends AbstractGroupScan {
return "MockScanEntry [records=" + records + ", columns=" + Arrays.toString(types) + "]";
}
}
-
+
@JsonInclude(Include.NON_NULL)
public static class MockColumn{
@JsonProperty("type") public MinorType minorType;
@@ -125,8 +127,8 @@ public class MockGroupScanPOP extends AbstractGroupScan {
public Integer width;
public Integer precision;
public Integer scale;
-
-
+
+
@JsonCreator
public MockColumn(@JsonProperty("name") String name, @JsonProperty("type") MinorType minorType, @JsonProperty("mode") DataMode mode, @JsonProperty("width") Integer width, @JsonProperty("precision") Integer precision, @JsonProperty("scale") Integer scale) {
this.name = name;
@@ -136,7 +138,7 @@ public class MockGroupScanPOP extends AbstractGroupScan {
this.precision = precision;
this.scale = scale;
}
-
+
@JsonProperty("type")
public MinorType getMinorType() {
return minorType;
@@ -156,7 +158,7 @@ public class MockGroupScanPOP extends AbstractGroupScan {
public Integer getScale() {
return scale;
}
-
+
@JsonIgnore
public MajorType getMajorType(){
MajorType.Builder b = MajorType.newBuilder();
@@ -172,7 +174,7 @@ public class MockGroupScanPOP extends AbstractGroupScan {
public String toString() {
return "MockColumn [minorType=" + minorType + ", name=" + name + ", mode=" + mode + "]";
}
-
+
}
@Override
@@ -184,7 +186,7 @@ public class MockGroupScanPOP extends AbstractGroupScan {
@Override
public void applyAssignments(List<DrillbitEndpoint> endpoints) {
Preconditions.checkArgument(endpoints.size() <= getReadEntries().size());
-
+
mappings = new LinkedList[endpoints.size()];
int i =0;
@@ -230,6 +232,11 @@ public class MockGroupScanPOP extends AbstractGroupScan {
}
@Override
+ public GroupScan clone(List<SchemaPath> columns) {
+ return this;
+ }
+
+ @Override
public String getDigest() {
return toString();
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/AbstractWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/AbstractWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/AbstractWriter.java
index f0d4fb6..0c9a1cd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/AbstractWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/AbstractWriter.java
@@ -48,7 +48,7 @@ abstract class AbstractWriter<V extends ValueVector> implements PojoWriter{
@Override
public void allocate() {
- AllocationHelper.allocate(vector, 500, 100);
+ vector.allocateNew();
}
public void setValueCount(int valueCount){
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoDataType.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoDataType.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoDataType.java
new file mode 100644
index 0000000..8ecb29f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoDataType.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.pojo;
+
+import java.lang.reflect.Field;
+import java.util.List;
+
+import org.eigenbase.reltype.RelDataType;
+import org.eigenbase.reltype.RelDataTypeFactory;
+import org.eigenbase.sql.type.SqlTypeName;
+
+import com.google.common.collect.Lists;
+
+public class PojoDataType {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PojoDataType.class);
+
+ public List<SqlTypeName> types = Lists.newArrayList();
+ public List<String> names = Lists.newArrayList();
+
+ public PojoDataType(Class<?> pojoClass){
+ logger.debug(pojoClass.getName());
+ Field[] fields = pojoClass.getDeclaredFields();
+ for(int i = 0; i < fields.length; i++){
+ Field f = fields[i];
+
+ Class<?> type = f.getType();
+ names.add(f.getName());
+
+ if(type == int.class || type == Integer.class){
+ types.add(SqlTypeName.INTEGER);
+ }else if(type == boolean.class || type == Boolean.class){
+ types.add(SqlTypeName.BOOLEAN);
+ }else if(type == long.class || type == Long.class){
+ types.add(SqlTypeName.BIGINT);
+ }else if(type == double.class || type == Double.class){
+ types.add(SqlTypeName.DOUBLE);
+ }else if(type == String.class){
+ types.add(SqlTypeName.VARCHAR);
+ }else if(type.isEnum()){
+ types.add(SqlTypeName.VARCHAR);
+ }else{
+ throw new RuntimeException(String.format("PojoRecord reader doesn't yet support conversions from type [%s].", type));
+ }
+ }
+ }
+
+
+ public RelDataType getRowType(RelDataTypeFactory f){
+ List<RelDataType> fields = Lists.newArrayList();
+ for(SqlTypeName n : types){
+ fields.add(f.createSqlType(n));
+ }
+ return f.createStructType(fields, names);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
index 8dac455..4203abc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
@@ -25,8 +25,14 @@ import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.store.RecordReader;
import org.apache.drill.exec.store.pojo.Writers.BitWriter;
+import org.apache.drill.exec.store.pojo.Writers.DoubleWriter;
+import org.apache.drill.exec.store.pojo.Writers.EnumWriter;
import org.apache.drill.exec.store.pojo.Writers.IntWriter;
import org.apache.drill.exec.store.pojo.Writers.LongWriter;
+import org.apache.drill.exec.store.pojo.Writers.NBigIntWriter;
+import org.apache.drill.exec.store.pojo.Writers.NBooleanWriter;
+import org.apache.drill.exec.store.pojo.Writers.NDoubleWriter;
+import org.apache.drill.exec.store.pojo.Writers.NIntWriter;
import org.apache.drill.exec.store.pojo.Writers.StringWriter;
public class PojoRecordReader<T> implements RecordReader{
@@ -48,7 +54,7 @@ public class PojoRecordReader<T> implements RecordReader{
@Override
public void setup(OutputMutator output) throws ExecutionSetupException {
try{
- Field[] fields = pojoClass.getFields();
+ Field[] fields = pojoClass.getDeclaredFields();
writers = new PojoWriter[fields.length];
for(int i = 0; i < writers.length; i++){
Field f = fields[i];
@@ -56,6 +62,18 @@ public class PojoRecordReader<T> implements RecordReader{
if(type == int.class){
writers[i] = new IntWriter(f);
+ }else if(type == Integer.class){
+ writers[i] = new NIntWriter(f);
+ }else if(type == Long.class){
+ writers[i] = new NBigIntWriter(f);
+ }else if(type == Boolean.class){
+ writers[i] = new NBooleanWriter(f);
+ }else if(type == double.class){
+ writers[i] = new DoubleWriter(f);
+ }else if(type == Double.class){
+ writers[i] = new NDoubleWriter(f);
+ }else if(type.isEnum()){
+ writers[i] = new EnumWriter(f);
}else if(type == boolean.class){
writers[i] = new BitWriter(f);
}else if(type == long.class){
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/Writers.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/Writers.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/Writers.java
index 6910903..b986be8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/Writers.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/Writers.java
@@ -27,7 +27,12 @@ import org.apache.drill.common.types.Types;
import org.apache.drill.exec.expr.holders.NullableVarCharHolder;
import org.apache.drill.exec.vector.BigIntVector;
import org.apache.drill.exec.vector.BitVector;
+import org.apache.drill.exec.vector.Float8Vector;
import org.apache.drill.exec.vector.IntVector;
+import org.apache.drill.exec.vector.NullableBigIntVector;
+import org.apache.drill.exec.vector.NullableBitVector;
+import org.apache.drill.exec.vector.NullableFloat8Vector;
+import org.apache.drill.exec.vector.NullableIntVector;
import org.apache.drill.exec.vector.NullableVarCharVector;
import com.google.common.base.Charsets;
@@ -81,18 +86,32 @@ public class Writers {
}
- public static class StringWriter extends AbstractWriter<NullableVarCharVector>{
+ public static class DoubleWriter extends AbstractWriter<Float8Vector>{
+ public DoubleWriter(Field field) {
+ super(field, Types.required(MinorType.FLOAT8));
+ if(field.getType() != double.class) throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException {
+ double d = field.getDouble(pojo);
+
+ return vector.getMutator().setSafe(outboundIndex, d);
+ }
+
+ }
+
+ private abstract static class AbstractStringWriter extends AbstractWriter<NullableVarCharVector>{
private ByteBuf data;
private final NullableVarCharHolder h = new NullableVarCharHolder();
- public StringWriter(Field field) {
+ public AbstractStringWriter(Field field) {
super(field, Types.optional(MinorType.VARCHAR));
- if(field.getType() != String.class) throw new IllegalStateException();
ensureLength(100);
}
- private void ensureLength(int len){
+ void ensureLength(int len){
if(data == null || data.capacity() < len){
if(data != null) data.release();
data = UnpooledByteBufAllocator.DEFAULT.buffer(len);
@@ -103,11 +122,9 @@ public class Writers {
data.release();
}
- @Override
- public boolean writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException {
- String s = (String) field.get(pojo);
+ public boolean writeString(String s, int outboundIndex) throws IllegalArgumentException, IllegalAccessException {
if(s == null){
- h.isSet = 0;
+ return true;
}else{
h.isSet = 1;
byte[] bytes = s.getBytes(Charsets.UTF_8);
@@ -117,10 +134,108 @@ public class Writers {
h.buffer = data;
h.start = 0;
h.end = bytes.length;
+ return vector.getMutator().setSafe(outboundIndex, h);
+
+ }
+
+ }
+
+ }
+
+ public static class EnumWriter extends AbstractStringWriter{
+ public EnumWriter(Field field) {
+ super(field);
+ if(!field.getType().isEnum()) throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException {
+ Enum<?> e= ((Enum<?>) field.get(pojo));
+ if(e == null) return true;
+ return writeString(e.name(), outboundIndex);
+ }
+ }
+
+ public static class StringWriter extends AbstractStringWriter {
+ public StringWriter(Field field) {
+ super(field);
+ if(field.getType() != String.class) throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException {
+ String s = (String) field.get(pojo);
+ return writeString(s, outboundIndex);
+ }
+ }
+
+ public static class NIntWriter extends AbstractWriter<NullableIntVector>{
+
+ public NIntWriter(Field field) {
+ super(field, Types.optional(MinorType.INT));
+ if(field.getType() != Integer.class) throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException {
+ Integer i = (Integer) field.get(pojo);
+ if(i != null){
+ return vector.getMutator().setSafe(outboundIndex, i);
+ }
+ return true;
+ }
+
+ }
+ public static class NBigIntWriter extends AbstractWriter<NullableBigIntVector>{
+
+ public NBigIntWriter(Field field) {
+ super(field, Types.optional(MinorType.BIGINT));
+ if(field.getType() != Long.class) throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException {
+ Long o = (Long) field.get(pojo);
+ if(o != null){
+ return vector.getMutator().setSafe(outboundIndex, o);
}
+ return true;
+ }
- return vector.getMutator().setSafe(outboundIndex, h);
+ }
+
+ public static class NBooleanWriter extends AbstractWriter<NullableBitVector>{
+
+ public NBooleanWriter(Field field) {
+ super(field, Types.optional(MinorType.BIT));
+ if(field.getType() != Boolean.class) throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException {
+ Boolean o = (Boolean) field.get(pojo);
+ if(o != null){
+ return vector.getMutator().setSafe(outboundIndex, o ? 1 : 0);
+ }
+ return true;
+ }
+
+ }
+ public static class NDoubleWriter extends AbstractWriter<NullableFloat8Vector>{
+
+ public NDoubleWriter(Field field) {
+ super(field, Types.optional(MinorType.FLOAT8));
+ if(field.getType() != Double.class) throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException {
+ Double o = (Double) field.get(pojo);
+ if(o != null){
+ return vector.getMutator().setSafe(outboundIndex, o);
+ }
+ return true;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/DrillbitIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/DrillbitIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/DrillbitIterator.java
new file mode 100644
index 0000000..844fd68
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/DrillbitIterator.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys;
+
+import java.util.Iterator;
+
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+public class DrillbitIterator implements Iterator<Object> {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillbitIterator.class);
+
+ private Iterator<DrillbitEndpoint> endpoints;
+
+ public DrillbitIterator(FragmentContext c) {
+ this.endpoints = c.getDrillbitContext().getBits().iterator();
+ }
+
+ public static class DrillbitInstance {
+ public String host;
+ public int user_port;
+ public int control_port;
+ public int data_port;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return endpoints.hasNext();
+ }
+
+ @Override
+ public Object next() {
+ DrillbitEndpoint ep = endpoints.next();
+ DrillbitInstance i = new DrillbitInstance();
+ i.host = ep.getAddress();
+ i.user_port = ep.getUserPort();
+ i.control_port = ep.getControlPort();
+ i.data_port = ep.getDataPort();
+ return i;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/StaticDrillTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/StaticDrillTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/StaticDrillTable.java
new file mode 100644
index 0000000..c1e8dd1
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/StaticDrillTable.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys;
+
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.store.StoragePlugin;
+import org.apache.drill.exec.store.pojo.PojoDataType;
+import org.eigenbase.reltype.RelDataType;
+import org.eigenbase.reltype.RelDataTypeFactory;
+
+public class StaticDrillTable extends DrillTable{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StaticDrillTable.class);
+
+ private final PojoDataType type;
+
+ public StaticDrillTable(PojoDataType type, String storageEngineName, StoragePlugin plugin, Object selection) {
+ super(storageEngineName, plugin, selection);
+ this.type = type;
+ }
+
+ @Override
+ public RelDataType getRowType(RelDataTypeFactory typeFactory) {
+ return type.getRowType(typeFactory);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java
new file mode 100644
index 0000000..4301f12
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys;
+
+import org.apache.drill.exec.server.options.OptionValue;
+import org.apache.drill.exec.store.pojo.PojoDataType;
+import org.eigenbase.reltype.RelDataType;
+import org.eigenbase.reltype.RelDataTypeFactory;
+
+public enum SystemTable {
+ OPTION("options", OptionValue.class),
+ DRILLBITS("drillbits", DrillbitIterator.DrillbitInstance.class)
+ ;
+
+ private final PojoDataType type;
+ private final String tableName;
+ private final Class<?> pojoClass;
+
+ SystemTable(String tableName, Class<?> clazz){
+ this.type = new PojoDataType(clazz);
+ this.tableName = tableName;
+ this.pojoClass = clazz;
+ }
+
+ public String getTableName(){
+ return tableName;
+ }
+
+ public RelDataType getRowType(RelDataTypeFactory f){
+ return type.getRowType(f);
+ }
+
+ public PojoDataType getType(){
+ return type;
+ }
+
+ public Class<?> getPojoClass(){
+ return pojoClass;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java
new file mode 100644
index 0000000..a1bec1e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.pojo.PojoRecordReader;
+
+public class SystemTableBatchCreator implements BatchCreator<SystemTableScan>{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SystemTableBatchCreator.class);
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ public RecordBatch getBatch(FragmentContext context, SystemTableScan scan, List<RecordBatch> children)
+ throws ExecutionSetupException {
+ Iterator<Object> iter = scan.getPlugin().getRecordIterator(context, scan.getTable());
+ PojoRecordReader reader = new PojoRecordReader(scan.getTable().getPojoClass(), iter);
+
+ return new ScanBatch(scan, context, Collections.singleton( (RecordReader) reader).iterator());
+ }
+}