You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pr...@apache.org on 2017/09/26 06:20:00 UTC

drill git commit: DRILL-5809 made the storage format of system options backwards compatible, and avoided storing unnecessary option info.

Repository: drill
Updated Branches:
  refs/heads/master 44d5cc8eb -> 8a8bf63f7


DRILL-5809 made the storage format of system options backwards compatible, and avoided storing unnecessary option info.

 - Fixed forward compatability issue
 - Added error message for debugging
 - Fix flakey test
 - Cleaned up bad logging
 - Applied comments

closes #957


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/8a8bf63f
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/8a8bf63f
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/8a8bf63f

Branch: refs/heads/master
Commit: 8a8bf63f7e9f804e761c69f8e94f34417f83c7f7
Parents: 44d5cc8
Author: Timothy Farkas <ti...@apache.org>
Authored: Thu Sep 21 22:40:46 2017 -0700
Committer: Paul Rogers <pr...@maprtech.com>
Committed: Mon Sep 25 22:08:45 2017 -0700

----------------------------------------------------------------------
 .../apache/drill/testutils/DirTestWatcher.java  |  61 ++++
 .../exec/server/options/BaseOptionManager.java  |   2 +-
 .../exec/server/options/OptionMetaData.java     |   2 +-
 .../drill/exec/server/options/OptionValue.java  |  60 +++-
 .../server/options/PersistedOptionValue.java    | 316 +++++++++++++++++++
 .../server/options/SystemOptionManager.java     |  34 +-
 .../java/org/apache/drill/BaseTestQuery.java    |  14 +-
 .../TestInboundImpersonationPrivileges.java     |   2 +-
 .../exec/server/options/OptionValueTest.java    |  72 +++++
 .../options/PersistedOptionValueTest.java       | 226 +++++++++++++
 .../exec/store/sys/TestPStoreProviders.java     | 121 ++++++-
 .../testing/TestCountDownLatchInjection.java    |   7 +-
 .../drill/exec/util/MiniZooKeeperCluster.java   |   4 +-
 .../test/resources/options/old_booleanopt.json  |   6 +
 .../test/resources/options/old_doubleopt.json   |   6 +
 .../src/test/resources/options/old_longopt.json |   6 +
 .../test/resources/options/old_stringopt.json   |   6 +
 ...rowgroup.filter.pushdown.threshold.sys.drill |   6 +
 .../planner.width.max_per_query.sys.drill       |   6 +
 pom.xml                                         |   1 +
 20 files changed, 910 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/8a8bf63f/common/src/test/java/org/apache/drill/testutils/DirTestWatcher.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/drill/testutils/DirTestWatcher.java b/common/src/test/java/org/apache/drill/testutils/DirTestWatcher.java
new file mode 100644
index 0000000..a5a8806
--- /dev/null
+++ b/common/src/test/java/org/apache/drill/testutils/DirTestWatcher.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.testutils;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import java.io.File;
+
+/**
+ * This JUnit {@link TestWatcher} creates a unique directory for each JUnit test in the project's
+ * target folder at the start of each test. This directory can be used as a temporary directory to store
+ * files for the test. The directory and its contents are deleted at the end of the test.
+ */
+public class DirTestWatcher extends TestWatcher {
+  private String dirPath;
+  private File dir;
+
+  @Override
+  protected void starting(Description description) {
+    dirPath = String.format("%s/target/%s/%s", new File(".").getAbsolutePath(),
+      description.getClassName(), description.getMethodName());
+    dir = new File(dirPath);
+    dir.mkdirs();
+  }
+
+  @Override
+  protected void finished(Description description) {
+    FileUtils.deleteQuietly(dir);
+  }
+
+  @Override
+  protected void failed(Throwable e, Description description) {
+    FileUtils.deleteQuietly(dir);
+  }
+
+  public String getDirPath() {
+    return dirPath;
+  }
+
+  public File getDir() {
+    return dir;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/8a8bf63f/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/BaseOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/BaseOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/BaseOptionManager.java
index 26f9108..5a0fa2d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/BaseOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/BaseOptionManager.java
@@ -62,7 +62,7 @@ public abstract class BaseOptionManager extends BaseOptionSet implements OptionM
     final OptionDefinition definition = getOptionDefinition(name);
     final OptionValidator validator = definition.getValidator();
     final OptionMetaData metaData = definition.getMetaData();
-    final OptionValue.AccessibleScopes type = definition.getMetaData().getType();
+    final OptionValue.AccessibleScopes type = definition.getMetaData().getAccessibleScopes();
     final OptionValue.OptionScope scope = getScope();
     checkOptionPermissions(name, type, scope);
     final OptionValue optionValue = OptionValue.create(type, name, value, scope);

http://git-wip-us.apache.org/repos/asf/drill/blob/8a8bf63f/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionMetaData.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionMetaData.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionMetaData.java
index f73c348..6d2762f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionMetaData.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionMetaData.java
@@ -34,7 +34,7 @@ public class OptionMetaData {
     this.internal = internal;
   }
 
-  public OptionValue.AccessibleScopes getType() {
+  public OptionValue.AccessibleScopes getAccessibleScopes() {
     return type;
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/8a8bf63f/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java
index 0a49f5d..e14a846 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java
@@ -44,6 +44,14 @@ import java.util.EnumSet;
  */
 @JsonInclude(Include.NON_NULL)
 public class OptionValue implements Comparable<OptionValue> {
+  public static final String JSON_KIND = "kind";
+  public static final String JSON_ACCESSIBLE_SCOPES = "accessibleScopes";
+  public static final String JSON_NAME = "name";
+  public static final String JSON_NUM_VAL = "num_val";
+  public static final String JSON_STRING_VAL = "string_val";
+  public static final String JSON_BOOL_VAL = "bool_val";
+  public static final String JSON_FLOAT_VAL = "float_val";
+  public static final String JSON_SCOPE = "scope";
 
   /**
    * Defines where an option can be configured.
@@ -88,20 +96,36 @@ public class OptionValue implements Comparable<OptionValue> {
   public final Double float_val;
   public final OptionScope scope;
 
-  public static OptionValue create(AccessibleScopes type, String name, long val, OptionScope scope) {
-    return new OptionValue(Kind.LONG, type, name, val, null, null, null, scope);
+  public static OptionValue create(AccessibleScopes accessibleScopes, String name, long val, OptionScope scope) {
+    return new OptionValue(Kind.LONG, accessibleScopes, name, val, null, null, null, scope);
   }
 
-  public static OptionValue create(AccessibleScopes type, String name, boolean bool, OptionScope scope) {
-    return new OptionValue(Kind.BOOLEAN, type, name, null, null, bool, null, scope);
+  public static OptionValue create(AccessibleScopes accessibleScopes, String name, boolean bool, OptionScope scope) {
+    return new OptionValue(Kind.BOOLEAN, accessibleScopes, name, null, null, bool, null, scope);
   }
 
-  public static OptionValue create(AccessibleScopes type, String name, String val, OptionScope scope) {
-    return new OptionValue(Kind.STRING, type, name, null, val, null, null, scope);
+  public static OptionValue create(AccessibleScopes accessibleScopes, String name, String val, OptionScope scope) {
+    return new OptionValue(Kind.STRING, accessibleScopes, name, null, val, null, null, scope);
   }
 
-  public static OptionValue create(AccessibleScopes type, String name, double val, OptionScope scope) {
-    return new OptionValue(Kind.DOUBLE, type, name, null, null, null, val, scope);
+  public static OptionValue create(AccessibleScopes accessibleScopes, String name, double val, OptionScope scope) {
+    return new OptionValue(Kind.DOUBLE, accessibleScopes, name, null, null, null, val, scope);
+  }
+
+  public static OptionValue create(Kind kind, AccessibleScopes accessibleScopes,
+                                   String name, String val, OptionScope scope) {
+    switch (kind) {
+      case BOOLEAN:
+        return create(accessibleScopes, name, Boolean.valueOf(val), scope);
+      case STRING:
+        return create(accessibleScopes, name, val, scope);
+      case DOUBLE:
+        return create(accessibleScopes, name, Double.parseDouble(val), scope);
+      case LONG:
+        return create(accessibleScopes, name, Long.parseLong(val), scope);
+      default:
+        throw new IllegalArgumentException(String.format("Unsupported kind %s", kind));
+    }
   }
 
   public static OptionValue create(AccessibleScopes type, String name, Object val, OptionScope scope) {
@@ -119,14 +143,14 @@ public class OptionValue implements Comparable<OptionValue> {
   }
 
   @JsonCreator
-  private OptionValue(@JsonProperty("kind") Kind kind,
-                      @JsonProperty("accessibleScopes") AccessibleScopes accessibleScopes,
-                      @JsonProperty("name") String name,
-                      @JsonProperty("num_val") Long num_val,
-                      @JsonProperty("string_val") String string_val,
-                      @JsonProperty("bool_val") Boolean bool_val,
-                      @JsonProperty("float_val") Double float_val,
-                      @JsonProperty("scope") OptionScope scope) {
+  private OptionValue(@JsonProperty(JSON_KIND) Kind kind,
+                      @JsonProperty(JSON_ACCESSIBLE_SCOPES) AccessibleScopes accessibleScopes,
+                      @JsonProperty(JSON_NAME) String name,
+                      @JsonProperty(JSON_NUM_VAL) Long num_val,
+                      @JsonProperty(JSON_STRING_VAL) String string_val,
+                      @JsonProperty(JSON_BOOL_VAL) Boolean bool_val,
+                      @JsonProperty(JSON_FLOAT_VAL) Double float_val,
+                      @JsonProperty(JSON_SCOPE) OptionScope scope) {
     Preconditions.checkArgument(num_val != null || string_val != null || bool_val != null || float_val != null);
     this.kind = kind;
     this.accessibleScopes = accessibleScopes;
@@ -158,6 +182,10 @@ public class OptionValue implements Comparable<OptionValue> {
     }
   }
 
+  public PersistedOptionValue toPersisted() {
+    return new PersistedOptionValue(kind, name, num_val, string_val, bool_val, float_val);
+  }
+
   @Override
   public int hashCode() {
     final int prime = 31;

http://git-wip-us.apache.org/repos/asf/drill/blob/8a8bf63f/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/PersistedOptionValue.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/PersistedOptionValue.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/PersistedOptionValue.java
new file mode 100644
index 0000000..685799c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/PersistedOptionValue.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.server.options;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.ObjectCodec;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import com.google.common.base.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * <p>
+ * This represents a persisted {@link OptionValue}. Decoupling the {@link OptionValue} from what
+ * is persisted will prevent us from accidentally breaking backward compatibility in the future
+ * when the {@link OptionValue} changes. Additionally when we do change the format of stored options we
+ * will not have to change much code since this is already designed with backward compatibility in mind.
+ * This class is also forward compatible with the Drill Option storage format in Drill 1.11 and earlier.
+ * </p>
+ *
+ * <p>
+ * <b>Contract:</b>
+ * Only {@link PersistedOptionValue}s created from an {@link OptionValue} should be persisted.
+ * And {@link OptionValue}s should only be created from {@link PersistedOptionValue}s that are
+ * retrieved from a store.
+ * </p>
+ */
+
+// Custom Deserializer required for backward compatibility DRILL-5809
+@JsonInclude(JsonInclude.Include.NON_NULL)
+@JsonDeserialize(using = PersistedOptionValue.Deserializer.class)
+public class PersistedOptionValue {
+  /**
+   * This is present for forward compatability with Drill 1.11 and earlier
+   */
+  public static final String SYSTEM_TYPE = "SYSTEM";
+  /**
+   * This constant cannot be changed for backward and forward compatibility reasons.
+   */
+  public static final String JSON_TYPE = "type";
+  /**
+   * This constant cannot be changed for backward and forward compatibility reasons.
+   */
+  public static final String JSON_KIND = "kind";
+  /**
+   * This constant cannot be changed for backward and forward compatibility reasons.
+   */
+  public static final String JSON_NAME = "name";
+  /**
+   * This constant cannot be changed for backward and forward compatibility reasons.
+   */
+  public static final String JSON_NUM_VAL = "num_val";
+  /**
+   * This constant cannot be changed for backward and forward compatibility reasons.
+   */
+  public static final String JSON_STRING_VAL = "string_val";
+  /**
+   * This constant cannot be changed for backward and forward compatibility reasons.
+   */
+  public static final String JSON_BOOL_VAL = "bool_val";
+  /**
+   * This constant cannot be changed for backward and forward compatibility reasons.
+   */
+  public static final String JSON_FLOAT_VAL = "float_val";
+
+  private String value;
+  private OptionValue.Kind kind;
+  private String name;
+  private Long num_val;
+  private String string_val;
+  private Boolean bool_val;
+  private Double float_val;
+
+  public PersistedOptionValue(String value) {
+    this.value = Preconditions.checkNotNull(value);
+  }
+
+  public PersistedOptionValue(OptionValue.Kind kind, String name,
+                              Long num_val, String string_val,
+                              Boolean bool_val, Double float_val) {
+    this.kind = kind;
+    this.name = name;
+    this.num_val = num_val;
+    this.string_val = string_val;
+    this.bool_val = bool_val;
+    this.float_val = float_val;
+
+    switch (kind) {
+      case BOOLEAN:
+        Preconditions.checkNotNull(bool_val);
+        value = bool_val.toString();
+        break;
+      case STRING:
+        Preconditions.checkNotNull(string_val);
+        value = string_val;
+        break;
+      case DOUBLE:
+        Preconditions.checkNotNull(float_val);
+        value = float_val.toString();
+        break;
+      case LONG:
+        Preconditions.checkNotNull(num_val);
+        value = num_val.toString();
+        break;
+      default:
+        throw new UnsupportedOperationException(String.format("Unsupported type %s", kind));
+    }
+  }
+
+  /**
+   * This is ignored for forward compatibility.
+   */
+  @JsonIgnore
+  public String getValue() {
+    return value;
+  }
+
+  /**
+   * This is present for forward compatibility.
+   */
+  @JsonProperty(JSON_TYPE)
+  public String getType() {
+    return SYSTEM_TYPE;
+  }
+
+  /**
+   * This is present for forward compatibility.
+   */
+  @JsonProperty(JSON_KIND)
+  public OptionValue.Kind getKind() {
+    return kind;
+  }
+
+  /**
+   * This is present for forward compatibility.
+   */
+  @JsonProperty(JSON_NAME)
+  public String getName() {
+    return name;
+  }
+
+  /**
+   * This is present for forward compatibility.
+   */
+  @JsonProperty(JSON_NUM_VAL)
+  public Long getNumVal() {
+    return num_val;
+  }
+
+  /**
+   * This is present for forward compatibility.
+   */
+  @JsonProperty(JSON_STRING_VAL)
+  public String getStringVal() {
+    return string_val;
+  }
+
+  /**
+   * This is present for forward compatibility.
+   */
+  @JsonProperty(JSON_BOOL_VAL)
+  public Boolean getBoolVal() {
+    return bool_val;
+  }
+
+  /**
+   * This is present for forward compatibility.
+   */
+  @JsonProperty(JSON_FLOAT_VAL)
+  public Double getFloatVal() {
+    return float_val;
+  }
+
+  public OptionValue toOptionValue(final OptionDefinition optionDefinition, final OptionValue.OptionScope optionScope) {
+    Preconditions.checkNotNull(value, "The value must be defined in order for this to be converted to an " +
+    "option value");
+    final OptionValidator validator = optionDefinition.getValidator();
+    final OptionValue.Kind kind = validator.getKind();
+    final String name = validator.getOptionName();
+    final OptionValue.AccessibleScopes accessibleScopes = optionDefinition.getMetaData().getAccessibleScopes();
+    return OptionValue.create(kind, accessibleScopes, name, value, optionScope);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    PersistedOptionValue that = (PersistedOptionValue) o;
+
+    if (value != null ? !value.equals(that.value) : that.value != null) {
+      return false;
+    }
+
+    if (kind != that.kind) {
+      return false;
+    }
+
+    if (name != null ? !name.equals(that.name) : that.name != null) {
+      return false;
+    }
+
+    if (num_val != null ? !num_val.equals(that.num_val) : that.num_val != null) {
+      return false;
+    }
+
+    if (string_val != null ? !string_val.equals(that.string_val) : that.string_val != null) {
+      return false;
+    }
+
+    if (bool_val != null ? !bool_val.equals(that.bool_val) : that.bool_val != null) {
+      return false;
+    }
+
+    return float_val != null ? float_val.equals(that.float_val) : that.float_val == null;
+  }
+
+  @Override
+  public int hashCode() {
+    int result = value != null ? value.hashCode() : 0;
+    result = 31 * result + (kind != null ? kind.hashCode() : 0);
+    result = 31 * result + (name != null ? name.hashCode() : 0);
+    result = 31 * result + (num_val != null ? num_val.hashCode() : 0);
+    result = 31 * result + (string_val != null ? string_val.hashCode() : 0);
+    result = 31 * result + (bool_val != null ? bool_val.hashCode() : 0);
+    result = 31 * result + (float_val != null ? float_val.hashCode() : 0);
+    return result;
+  }
+
+  @Override
+  public String toString() {
+    return "PersistedOptionValue{" + "value='" + value + '\'' + ", kind=" + kind + ", name='" + name +
+      '\'' + ", num_val=" + num_val + ", string_val='" + string_val + '\'' + ", bool_val=" + bool_val +
+      ", float_val=" + float_val + '}';
+  }
+
+  /**
+   * This deserializer only fetches the relevant information we care about from a store, which is the
+   * value of an option. This deserializer is essentially future proof since it only requires a value
+   * to be stored for an option.
+   */
+  public static class Deserializer extends StdDeserializer<PersistedOptionValue> {
+    private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Deserializer.class);
+
+    private Deserializer() {
+      super(PersistedOptionValue.class);
+    }
+
+    protected Deserializer(JavaType valueType) {
+      super(valueType);
+    }
+
+    protected Deserializer(StdDeserializer<?> src) {
+      super(src);
+    }
+
+    @Override
+    public PersistedOptionValue deserialize(JsonParser p, DeserializationContext ctxt) throws IOException, JsonProcessingException {
+      ObjectCodec oc = p.getCodec();
+      JsonNode node = oc.readTree(p);
+      String value = null;
+
+      if (node.has(OptionValue.JSON_NUM_VAL)) {
+        value = node.get(OptionValue.JSON_NUM_VAL).asText();
+      }
+
+      if (node.has(OptionValue.JSON_STRING_VAL)) {
+        value = node.get(OptionValue.JSON_STRING_VAL).asText();
+      }
+
+      if (node.has(OptionValue.JSON_BOOL_VAL)) {
+        value = node.get(OptionValue.JSON_BOOL_VAL).asText();
+      }
+
+      if (node.has(OptionValue.JSON_FLOAT_VAL)) {
+        value = node.get(OptionValue.JSON_FLOAT_VAL).asText();
+      }
+
+      if (value == null) {
+        logger.error("Invalid json stored {}.", new ObjectMapper().writeValueAsString(node));
+      }
+
+      return new PersistedOptionValue(value);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8a8bf63f/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index d1d56a2..b0863ee 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -17,8 +17,6 @@
  */
 package org.apache.drill.exec.server.options;
 
-import static com.google.common.base.Preconditions.checkArgument;
-
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -218,7 +216,7 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
     return map;
   }
 
-  private final PersistentStoreConfig<OptionValue> config;
+  private final PersistentStoreConfig<PersistedOptionValue> config;
 
   private final PersistentStoreProvider provider;
 
@@ -227,7 +225,7 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
    * Persistent store for options that have been changed from default.
    * NOTE: CRUD operations must use lowercase keys.
    */
-  private PersistentStore<OptionValue> options;
+  private PersistentStore<PersistedOptionValue> options;
   private CaseInsensitiveMap<OptionDefinition> definitions;
   private CaseInsensitiveMap<OptionValue> defaults;
 
@@ -239,7 +237,7 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
   public SystemOptionManager(final LogicalPlanPersistence lpPersistence, final PersistentStoreProvider provider,
                              final DrillConfig bootConfig, final CaseInsensitiveMap<OptionDefinition> definitions) {
     this.provider = provider;
-    this.config = PersistentStoreConfig.newJacksonBuilder(lpPersistence.getMapper(), OptionValue.class)
+    this.config = PersistentStoreConfig.newJacksonBuilder(lpPersistence.getMapper(), PersistedOptionValue.class)
           .name("sys.options")
           .build();
     this.bootConfig = bootConfig;
@@ -256,7 +254,7 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
   public SystemOptionManager init() throws Exception {
     options = provider.getOrCreateStore(config);
     // if necessary, deprecate and replace options from persistent store
-    for (final Entry<String, OptionValue> option : Lists.newArrayList(options.getAll())) {
+    for (final Entry<String, PersistedOptionValue> option : Lists.newArrayList(options.getAll())) {
       final String name = option.getKey();
       final OptionDefinition definition = definitions.get(name);
       if (definition == null) {
@@ -269,7 +267,7 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
         if (!name.equals(canonicalName)) {
           // for backwards compatibility <= 1.1, rename to lower case.
           logger.warn("Changing option name to lower case `{}`", name);
-          final OptionValue value = option.getValue();
+          final PersistedOptionValue value = option.getValue();
           options.delete(name);
           options.put(canonicalName, value);
         }
@@ -287,8 +285,13 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
       buildList.put(entry.getKey(), entry.getValue());
     }
     // override if changed
-    for (final Map.Entry<String, OptionValue> entry : Lists.newArrayList(options.getAll())) {
-      buildList.put(entry.getKey(), entry.getValue());
+    for (final Map.Entry<String, PersistedOptionValue> entry : Lists.newArrayList(options.getAll())) {
+      final String name = entry.getKey();
+      final OptionDefinition optionDefinition = getOptionDefinition(name);
+      final PersistedOptionValue persistedOptionValue = entry.getValue();
+      final OptionValue optionValue = persistedOptionValue
+        .toOptionValue(optionDefinition, OptionValue.OptionScope.SYSTEM);
+      buildList.put(name, optionValue);
     }
     return buildList.values().iterator();
   }
@@ -296,10 +299,11 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
   @Override
   public OptionValue getOption(final String name) {
     // check local space (persistent store)
-    final OptionValue value = options.get(name.toLowerCase());
+    final PersistedOptionValue persistedValue = options.get(name.toLowerCase());
 
-    if (value != null) {
-      return value;
+    if (persistedValue != null) {
+      final OptionDefinition optionDefinition = getOptionDefinition(name);
+      return persistedValue.toOptionValue(optionDefinition, OptionValue.OptionScope.SYSTEM);
     }
 
     // otherwise, return default set in the validator.
@@ -323,7 +327,7 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
       return; // if the option is not overridden, ignore setting option to default
     }
 
-    options.put(name, value);
+    options.put(name, value.toPersisted());
   }
 
   @Override
@@ -340,7 +344,7 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
   @Override
   public void deleteAllLocalOptions() {
     final Set<String> names = Sets.newHashSet();
-    for (final Map.Entry<String, OptionValue> entry : Lists.newArrayList(options.getAll())) {
+    for (final Map.Entry<String, PersistedOptionValue> entry : Lists.newArrayList(options.getAll())) {
       names.add(entry.getKey());
     }
     for (final String name : names) {
@@ -355,7 +359,7 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
     for (final Map.Entry<String, OptionDefinition> entry : definitions.entrySet()) {
       final OptionDefinition definition = entry.getValue();
       final OptionMetaData metaData = definition.getMetaData();
-      final OptionValue.AccessibleScopes type = metaData.getType();
+      final OptionValue.AccessibleScopes type = metaData.getAccessibleScopes();
       final OptionValidator validator = definition.getValidator();
       final String name = validator.getOptionName();
       final String configName = validator.getConfigProperty();

http://git-wip-us.apache.org/repos/asf/drill/blob/8a8bf63f/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
index e30c5e9..d40233d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
@@ -92,6 +92,8 @@ public class BaseTestQuery extends ExecTest {
     {
       put(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, "false");
       put(ExecConstants.HTTP_ENABLE, "false");
+      // Increasing retry attempts for testing
+      put(ExecConstants.UDF_RETRY_ATTEMPTS, "10");
     }
   };
 
@@ -198,6 +200,10 @@ public class BaseTestQuery extends ExecTest {
   }
 
   private static void openClient(Properties properties) throws Exception {
+    if (properties == null) {
+      properties = new Properties();
+    }
+
     allocator = RootAllocatorFactory.newRoot(config);
     serviceSet = RemoteServiceSet.getLocalServiceSet();
 
@@ -213,7 +219,13 @@ public class BaseTestQuery extends ExecTest {
       TestUtilities.makeDfsTmpSchemaImmutable(pluginRegistry);
     }
 
-    client = QueryTestUtil.createClient(config,  serviceSet, MAX_WIDTH_PER_NODE, properties);
+    if (!properties.containsKey(DrillProperties.DRILLBIT_CONNECTION)) {
+      properties = new Properties(properties);
+      properties.setProperty(DrillProperties.DRILLBIT_CONNECTION,
+        String.format("localhost:%s", bits[0].getUserPort()));
+    }
+
+    client = QueryTestUtil.createClient(config, serviceSet, MAX_WIDTH_PER_NODE, properties);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/drill/blob/8a8bf63f/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestInboundImpersonationPrivileges.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestInboundImpersonationPrivileges.java b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestInboundImpersonationPrivileges.java
index f452669..1c656e0 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestInboundImpersonationPrivileges.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestInboundImpersonationPrivileges.java
@@ -50,7 +50,7 @@ public class TestInboundImpersonationPrivileges extends BaseTestImpersonation {
   private static boolean checkPrivileges(final String proxyName, final String targetName) {
     OptionDefinition optionDefinition = SystemOptionManager.createDefaultOptionDefinitions().get(ExecConstants.IMPERSONATION_POLICIES_KEY);
     ExecConstants.IMPERSONATION_POLICY_VALIDATOR.validate(
-        OptionValue.create(optionDefinition.getMetaData().getType(),
+        OptionValue.create(optionDefinition.getMetaData().getAccessibleScopes(),
             ExecConstants.IMPERSONATION_POLICIES_KEY,
             IMPERSONATION_POLICIES,OptionValue.OptionScope.SYSTEM), optionDefinition.getMetaData(),null);
     try {

http://git-wip-us.apache.org/repos/asf/drill/blob/8a8bf63f/exec/java-exec/src/test/java/org/apache/drill/exec/server/options/OptionValueTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/options/OptionValueTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/options/OptionValueTest.java
new file mode 100644
index 0000000..5f674e8
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/options/OptionValueTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.server.options;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class OptionValueTest {
+  @Test
+  public void createBooleanKindTest() {
+    final OptionValue createdValue = OptionValue.create(
+      OptionValue.Kind.BOOLEAN, OptionValue.AccessibleScopes.ALL,
+      "myOption", "true", OptionValue.OptionScope.SYSTEM);
+
+    final OptionValue expectedValue = OptionValue.create(
+      OptionValue.AccessibleScopes.ALL, "myOption", true, OptionValue.OptionScope.SYSTEM);
+
+    Assert.assertEquals(expectedValue, createdValue);
+  }
+
+  @Test
+  public void createDoubleKindTest() {
+    final OptionValue createdValue = OptionValue.create(
+      OptionValue.Kind.DOUBLE, OptionValue.AccessibleScopes.ALL,
+      "myOption", "1.5", OptionValue.OptionScope.SYSTEM);
+
+    final OptionValue expectedValue = OptionValue.create(
+      OptionValue.AccessibleScopes.ALL, "myOption", 1.5, OptionValue.OptionScope.SYSTEM);
+
+    Assert.assertEquals(expectedValue, createdValue);
+  }
+
+  @Test
+  public void createLongKindTest() {
+    final OptionValue createdValue = OptionValue.create(
+      OptionValue.Kind.LONG, OptionValue.AccessibleScopes.ALL,
+      "myOption", "3000", OptionValue.OptionScope.SYSTEM);
+
+    final OptionValue expectedValue = OptionValue.create(
+      OptionValue.AccessibleScopes.ALL, "myOption", 3000, OptionValue.OptionScope.SYSTEM);
+
+    Assert.assertEquals(expectedValue, createdValue);
+  }
+
+  @Test
+  public void createStringKindTest() {
+    final OptionValue createdValue = OptionValue.create(
+      OptionValue.Kind.STRING, OptionValue.AccessibleScopes.ALL,
+      "myOption", "wabalubawubdub", OptionValue.OptionScope.SYSTEM);
+
+    final OptionValue expectedValue = OptionValue.create(
+      OptionValue.AccessibleScopes.ALL, "myOption", "wabalubawubdub", OptionValue.OptionScope.SYSTEM);
+
+    Assert.assertEquals(expectedValue, createdValue);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8a8bf63f/exec/java-exec/src/test/java/org/apache/drill/exec/server/options/PersistedOptionValueTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/options/PersistedOptionValueTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/options/PersistedOptionValueTest.java
new file mode 100644
index 0000000..182442b
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/options/PersistedOptionValueTest.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.server.options;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.drill.common.util.FileUtils;
+import org.apache.drill.exec.serialization.JacksonSerializer;
+import org.apache.drill.exec.store.sys.PersistentStoreConfig;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class PersistedOptionValueTest {
+  /**
+   * DRILL-5809
+   * Note: If this test breaks you are probably breaking backward and forward compatibility. Verify with the community
+   * that breaking compatibility is acceptable and planned for.
+   * @throws Exception
+   */
+  @Test
+  public void oldDeserializeTest() throws IOException {
+    testHelper("/options/old_booleanopt.json",
+      "/options/old_doubleopt.json",
+      "/options/old_longopt.json",
+      "/options/old_stringopt.json");
+  }
+
+  private void testHelper(String booleanOptionFile, String doubleOptionFile,
+                          String longOptionFile, String stringOptionFile) throws IOException {
+    JacksonSerializer serializer = new JacksonSerializer<>(new ObjectMapper(), PersistedOptionValue.class);
+    String booleanOptionJson = FileUtils.getResourceAsString(booleanOptionFile);
+    String doubleOptionJson = FileUtils.getResourceAsString(doubleOptionFile);
+    String longOptionJson = FileUtils.getResourceAsString(longOptionFile);
+    String stringOptionJson = FileUtils.getResourceAsString(stringOptionFile);
+
+    PersistedOptionValue booleanValue = (PersistedOptionValue) serializer.deserialize(booleanOptionJson.getBytes());
+    PersistedOptionValue doubleValue = (PersistedOptionValue) serializer.deserialize(doubleOptionJson.getBytes());
+    PersistedOptionValue longValue = (PersistedOptionValue) serializer.deserialize(longOptionJson.getBytes());
+    PersistedOptionValue stringValue = (PersistedOptionValue) serializer.deserialize(stringOptionJson.getBytes());
+
+    PersistedOptionValue expectedBooleanValue = new PersistedOptionValue("true");
+    PersistedOptionValue expectedDoubleValue = new PersistedOptionValue("1.5");
+    PersistedOptionValue expectedLongValue = new PersistedOptionValue("5000");
+    PersistedOptionValue expectedStringValue = new PersistedOptionValue("wabalubadubdub");
+
+    Assert.assertEquals(expectedBooleanValue, booleanValue);
+    Assert.assertEquals(expectedDoubleValue, doubleValue);
+    Assert.assertEquals(expectedLongValue, longValue);
+    Assert.assertEquals(expectedStringValue, stringValue);
+  }
+
+  @Test
+  public void valueAssignment() {
+    final String name = "myOption";
+    final String stringContent = "val1";
+    PersistedOptionValue stringValue =
+      new PersistedOptionValue(OptionValue.Kind.STRING, name, null, stringContent, null, null);
+    PersistedOptionValue numValue =
+      new PersistedOptionValue(OptionValue.Kind.LONG, name, 100L, null, null, null);
+    PersistedOptionValue boolValue =
+      new PersistedOptionValue(OptionValue.Kind.BOOLEAN, name, null, null, true, null);
+    PersistedOptionValue floatValue =
+      new PersistedOptionValue(OptionValue.Kind.DOUBLE, name, null, null, null, 55.5);
+
+    Assert.assertEquals(stringContent, stringValue.getValue());
+    Assert.assertEquals("100", numValue.getValue());
+    Assert.assertEquals("true", boolValue.getValue());
+    Assert.assertEquals("55.5", floatValue.getValue());
+  }
+
+  /**
+   * DRILL-5809 Test forward compatibility with Drill 1.11 and earlier.
+   * Note: If this test breaks you are probably breaking forward compatibility. Verify with the community
+   * that breaking compatibility is acceptable and planned for.
+   * @throws Exception
+   */
+  @Test
+  public void testForwardCompatibility() throws IOException {
+    final String name = "myOption";
+
+    JacksonSerializer realSerializer = new JacksonSerializer<>(new ObjectMapper(), PersistedOptionValue.class);
+    JacksonSerializer mockSerializer = new JacksonSerializer<>(new ObjectMapper(), MockPersistedOptionValue.class);
+
+    final String stringContent = "val1";
+    PersistedOptionValue stringValue =
+      new PersistedOptionValue(OptionValue.Kind.STRING, name, null, stringContent, null, null);
+    PersistedOptionValue numValue =
+      new PersistedOptionValue(OptionValue.Kind.LONG, name, 100L, null, null, null);
+    PersistedOptionValue boolValue =
+      new PersistedOptionValue(OptionValue.Kind.BOOLEAN, name, null, null, true, null);
+    PersistedOptionValue floatValue =
+      new PersistedOptionValue(OptionValue.Kind.DOUBLE, name, null, null, null, 55.5);
+
+    byte[] stringValueBytes = realSerializer.serialize(stringValue);
+    byte[] numValueBytes = realSerializer.serialize(numValue);
+    byte[] boolValueBytes = realSerializer.serialize(boolValue);
+    byte[] floatValueBytes = realSerializer.serialize(floatValue);
+
+    MockPersistedOptionValue mockStringValue = (MockPersistedOptionValue) mockSerializer.deserialize(stringValueBytes);
+    MockPersistedOptionValue mockNumValue = (MockPersistedOptionValue) mockSerializer.deserialize(numValueBytes);
+    MockPersistedOptionValue mockBoolValue = (MockPersistedOptionValue) mockSerializer.deserialize(boolValueBytes);
+    MockPersistedOptionValue mockFloatValue = (MockPersistedOptionValue) mockSerializer.deserialize(floatValueBytes);
+
+    MockPersistedOptionValue expectedStringValue =
+      new MockPersistedOptionValue(PersistedOptionValue.SYSTEM_TYPE, OptionValue.Kind.STRING, name,
+        null, stringContent, null, null);
+    MockPersistedOptionValue expectedNumValue =
+      new MockPersistedOptionValue(PersistedOptionValue.SYSTEM_TYPE, OptionValue.Kind.LONG, name,
+        100L, null, null, null);
+    MockPersistedOptionValue expectedBoolValue =
+      new MockPersistedOptionValue(PersistedOptionValue.SYSTEM_TYPE, OptionValue.Kind.BOOLEAN, name,
+        null, null, true, null);
+    MockPersistedOptionValue expectedFloatValue =
+      new MockPersistedOptionValue(PersistedOptionValue.SYSTEM_TYPE, OptionValue.Kind.DOUBLE, name,
+        null, null, null, 55.5);
+
+    Assert.assertEquals(expectedStringValue, mockStringValue);
+    Assert.assertEquals(expectedNumValue, mockNumValue);
+    Assert.assertEquals(expectedBoolValue, mockBoolValue);
+    Assert.assertEquals(expectedFloatValue, mockFloatValue);
+  }
+
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  public static class MockPersistedOptionValue {
+    public final String type;
+    public final OptionValue.Kind kind;
+    public final String name;
+    public final Long num_val;
+    public final String string_val;
+    public final Boolean bool_val;
+    public final Double float_val;
+
+    public MockPersistedOptionValue(@JsonProperty(PersistedOptionValue.JSON_TYPE) String type,
+                                    @JsonProperty(PersistedOptionValue.JSON_KIND) OptionValue.Kind kind,
+                                    @JsonProperty(PersistedOptionValue.JSON_NAME) String name,
+                                    @JsonProperty(PersistedOptionValue.JSON_NUM_VAL) Long num_val,
+                                    @JsonProperty(PersistedOptionValue.JSON_STRING_VAL) String string_val,
+                                    @JsonProperty(PersistedOptionValue.JSON_BOOL_VAL) Boolean bool_val,
+                                    @JsonProperty(PersistedOptionValue.JSON_FLOAT_VAL) Double float_val) {
+      this.type = type;
+      this.kind = kind;
+      this.name = name;
+      this.num_val = num_val;
+      this.string_val = string_val;
+      this.bool_val = bool_val;
+      this.float_val = float_val;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      MockPersistedOptionValue that = (MockPersistedOptionValue) o;
+
+      if (!type.equals(that.type)) {
+        return false;
+      }
+
+      if (kind != that.kind) {
+        return false;
+      }
+
+      if (!name.equals(that.name)) {
+        return false;
+      }
+
+      if (num_val != null ? !num_val.equals(that.num_val) : that.num_val != null) {
+        return false;
+      }
+
+      if (string_val != null ? !string_val.equals(that.string_val) : that.string_val != null) {
+        return false;
+      }
+
+      if (bool_val != null ? !bool_val.equals(that.bool_val) : that.bool_val != null) {
+        return false;
+      }
+
+      return float_val != null ? float_val.equals(that.float_val) : that.float_val == null;
+    }
+
+    @Override
+    public int hashCode() {
+      int result = type.hashCode();
+      result = 31 * result + kind.hashCode();
+      result = 31 * result + name.hashCode();
+      result = 31 * result + (num_val != null ? num_val.hashCode() : 0);
+      result = 31 * result + (string_val != null ? string_val.hashCode() : 0);
+      result = 31 * result + (bool_val != null ? bool_val.hashCode() : 0);
+      result = 31 * result + (float_val != null ? float_val.hashCode() : 0);
+      return result;
+    }
+
+    @Override
+    public String toString() {
+      return "MockPersistedOptionValue{" + "type='" + type + '\'' + ", kind=" + kind + ", name='" + name +
+        '\'' + ", num_val=" + num_val + ", string_val='" + string_val + '\'' + ", bool_val=" + bool_val +
+        ", float_val=" + float_val + '}';
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8a8bf63f/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java
index 0504bb7..79a0f45 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java
@@ -15,21 +15,40 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.drill.exec.store.sys;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.RetryNTimes;
 import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.util.FileUtils;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.TestWithZookeeper;
+import org.apache.drill.exec.coord.zk.PathUtils;
+import org.apache.drill.exec.coord.zk.ZookeeperClient;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.exec.server.options.PersistedOptionValue;
 import org.apache.drill.exec.store.sys.store.provider.LocalPersistentStoreProvider;
 import org.apache.drill.exec.store.sys.store.provider.ZookeeperPersistentStoreProvider;
+import org.apache.drill.test.ClientFixture;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.FixtureBuilder;
+import org.apache.drill.testutils.DirTestWatcher;
+import org.apache.zookeeper.CreateMode;
+import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
 
+import java.io.File;
+
 public class TestPStoreProviders extends TestWithZookeeper {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestPStoreProviders.class);
 
+  @Rule
+  public DirTestWatcher dirTestWatcher = new DirTestWatcher();
+
   @Test
   public void verifyLocalStore() throws Exception {
     try(LocalPersistentStoreProvider provider = new LocalPersistentStoreProvider(DrillConfig.create())){
@@ -39,19 +58,103 @@ public class TestPStoreProviders extends TestWithZookeeper {
 
   @Test
   public void verifyZkStore() throws Exception {
+    try(CuratorFramework curator = createCurator()){
+      curator.start();
+      ZookeeperPersistentStoreProvider provider = new ZookeeperPersistentStoreProvider(zkHelper.getConfig(), curator);
+      PStoreTestUtil.test(provider);
+    }
+  }
+
+  /**
+   * DRILL-5809
+   * Note: If this test breaks you are probably breaking backward and forward compatibility. Verify with the community
+   * that breaking compatibility is acceptable and planned for.
+   * @throws Exception
+   */
+  @Test
+  public void localBackwardCompatabilityTest() throws Exception {
+    localLoadTestHelper("src/test/resources/options/store/local/old/sys.options");
+  }
+
+  private void localLoadTestHelper(String propertiesDir) throws Exception {
+    File localOptionsResources = new File(propertiesDir);
+
+    String optionsDirPath = String.format("%s/sys.options", dirTestWatcher.getDirPath());
+    File optionsDir = new File(optionsDirPath);
+    optionsDir.mkdirs();
+
+    org.apache.commons.io.FileUtils.copyDirectory(localOptionsResources, optionsDir);
+
+    FixtureBuilder builder = ClusterFixture.builder().
+      configProperty(ExecConstants.HTTP_ENABLE, false).
+      configProperty(ExecConstants.SYS_STORE_PROVIDER_CLASS, LocalPersistentStoreProvider.class.getCanonicalName()).
+      configProperty(ExecConstants.SYS_STORE_PROVIDER_LOCAL_PATH, String.format("file://%s", dirTestWatcher.getDirPath())).
+      configProperty(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, true);
+
+    try (ClusterFixture cluster = builder.build();
+         ClientFixture client = cluster.clientFixture()) {
+      String parquetPushdown = client.queryBuilder().
+        sql("SELECT val FROM sys.%s where name='%s'",
+          SystemTable.OPTION_VAL.getTableName(),
+          PlannerSettings.PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD_KEY).
+        singletonString();
+
+      String plannerWidth = client.queryBuilder().
+        sql("SELECT val FROM sys.%s where name='%s'",
+          SystemTable.OPTION_VAL.getTableName(),
+          ExecConstants.MAX_WIDTH_GLOBAL_KEY).
+        singletonString();
+
+      Assert.assertEquals("30000", parquetPushdown);
+      Assert.assertEquals("3333", plannerWidth);
+    }
+  }
+
+  /**
+   * DRILL-5809
+   * Note: If this test breaks you are probably breaking backward and forward compatibility. Verify with the community
+   * that breaking compatibility is acceptable and planned for.
+   * @throws Exception
+   */
+  @Test
+  public void zkBackwardCompatabilityTest() throws Exception {
+    final String oldName = "myOldOption";
+
+    try (CuratorFramework curator = createCurator()) {
+      curator.start();
+
+      PersistentStoreConfig<PersistedOptionValue> storeConfig =
+        PersistentStoreConfig.newJacksonBuilder(new ObjectMapper(), PersistedOptionValue.class).name("sys.test").build();
+
+      try (ZookeeperClient zkClient = new ZookeeperClient(curator,
+        PathUtils.join("/", storeConfig.getName()), CreateMode.PERSISTENT)) {
+        zkClient.start();
+        String oldOptionJson = FileUtils.getResourceAsString("/options/old_booleanopt.json");
+        zkClient.put(oldName, oldOptionJson.getBytes(), null);
+      }
+
+      try (ZookeeperPersistentStoreProvider provider =
+        new ZookeeperPersistentStoreProvider(zkHelper.getConfig(), curator)) {
+        PersistentStore<PersistedOptionValue> store = provider.getOrCreateStore(storeConfig);
+
+        PersistedOptionValue oldOptionValue = store.get(oldName, null);
+        PersistedOptionValue expectedValue = new PersistedOptionValue("true");
+
+        Assert.assertEquals(expectedValue, oldOptionValue);
+      }
+    }
+  }
+
+  private CuratorFramework createCurator() {
     String connect = zkHelper.getConnectionString();
     DrillConfig config = zkHelper.getConfig();
 
     CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
-    .namespace(zkHelper.getConfig().getString(ExecConstants.ZK_ROOT))
-    .retryPolicy(new RetryNTimes(1, 100))
-    .connectionTimeoutMs(config.getInt(ExecConstants.ZK_TIMEOUT))
-    .connectString(connect);
+      .namespace(zkHelper.getConfig().getString(ExecConstants.ZK_ROOT))
+      .retryPolicy(new RetryNTimes(1, 100))
+      .connectionTimeoutMs(config.getInt(ExecConstants.ZK_TIMEOUT))
+      .connectString(connect);
 
-    try(CuratorFramework curator = builder.build()){
-      curator.start();
-      ZookeeperPersistentStoreProvider provider = new ZookeeperPersistentStoreProvider(config, curator);
-      PStoreTestUtil.test(provider);
-    }
+    return builder.build();
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/8a8bf63f/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestCountDownLatchInjection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestCountDownLatchInjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestCountDownLatchInjection.java
index 44cc3a7..407cb7c 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestCountDownLatchInjection.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestCountDownLatchInjection.java
@@ -122,7 +122,7 @@ public class TestCountDownLatchInjection extends BaseTestQuery {
 
   @Test // test would hang if the correct init, wait and countdowns did not happen, and the test timeout mechanism will
   // catch that case
-  public void latchInjected() {
+  public void latchInjected() throws InterruptedException {
     final int threads = 10;
     final ExtendedLatch trigger = new ExtendedLatch(1);
     final Pointer<Long> countingDownTime = new Pointer<>();
@@ -144,6 +144,11 @@ public class TestCountDownLatchInjection extends BaseTestQuery {
       fail("Thread should not be interrupted; there is no deliberate attempt.");
       return;
     }
+
+    while (countingDownTime.value == null) {
+      Thread.sleep(100L);
+    }
+
     assertTrue(timeSpentWaiting >= countingDownTime.value);
     try {
       queryContext.close();

http://git-wip-us.apache.org/repos/asf/drill/blob/8a8bf63f/exec/java-exec/src/test/java/org/apache/drill/exec/util/MiniZooKeeperCluster.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/util/MiniZooKeeperCluster.java b/exec/java-exec/src/test/java/org/apache/drill/exec/util/MiniZooKeeperCluster.java
index f9abe7f..d0359e1 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/util/MiniZooKeeperCluster.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/util/MiniZooKeeperCluster.java
@@ -154,8 +154,6 @@ public class MiniZooKeeperCluster {
       NIOServerCnxnFactory standaloneServerFactory;
 
       while (true) {
-        System.out.println("Starting zookeeper " + tentativePort);
-
         try {
           standaloneServerFactory = new NIOServerCnxnFactory();
           standaloneServerFactory.configure(new InetSocketAddress(tentativePort), 1000);
@@ -171,7 +169,7 @@ public class MiniZooKeeperCluster {
         try {
           standaloneServerFactory.startup(server);
         } catch (IOException e) {
-          LOG.error("Zookeeper startupt error", e);
+          LOG.error("Zookeeper startup error", e);
           tentativePort++;
           continue;
         }

http://git-wip-us.apache.org/repos/asf/drill/blob/8a8bf63f/exec/java-exec/src/test/resources/options/old_booleanopt.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/options/old_booleanopt.json b/exec/java-exec/src/test/resources/options/old_booleanopt.json
new file mode 100644
index 0000000..5614f5f
--- /dev/null
+++ b/exec/java-exec/src/test/resources/options/old_booleanopt.json
@@ -0,0 +1,6 @@
+{
+  "kind": "BOOLEAN",
+  "name": "myOldOption",
+  "bool_val": true,
+  "type": "SYSTEM"
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/8a8bf63f/exec/java-exec/src/test/resources/options/old_doubleopt.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/options/old_doubleopt.json b/exec/java-exec/src/test/resources/options/old_doubleopt.json
new file mode 100644
index 0000000..08d3924
--- /dev/null
+++ b/exec/java-exec/src/test/resources/options/old_doubleopt.json
@@ -0,0 +1,6 @@
+{
+  "kind": "DOUBLE",
+  "name": "myOldOption",
+  "float_val": 1.5,
+  "type": "SYSTEM"
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/8a8bf63f/exec/java-exec/src/test/resources/options/old_longopt.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/options/old_longopt.json b/exec/java-exec/src/test/resources/options/old_longopt.json
new file mode 100644
index 0000000..cd55173
--- /dev/null
+++ b/exec/java-exec/src/test/resources/options/old_longopt.json
@@ -0,0 +1,6 @@
+{
+  "kind": "LONG",
+  "name": "myOldOption",
+  "num_val": 5000,
+  "type": "SYSTEM"
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/8a8bf63f/exec/java-exec/src/test/resources/options/old_stringopt.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/options/old_stringopt.json b/exec/java-exec/src/test/resources/options/old_stringopt.json
new file mode 100644
index 0000000..c4fe699
--- /dev/null
+++ b/exec/java-exec/src/test/resources/options/old_stringopt.json
@@ -0,0 +1,6 @@
+{
+  "kind": "STRING",
+  "name": "myOldOption",
+  "string_val": "wabalubadubdub",
+  "type": "SYSTEM"
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/8a8bf63f/exec/java-exec/src/test/resources/options/store/local/old/sys.options/planner.store.parquet.rowgroup.filter.pushdown.threshold.sys.drill
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/options/store/local/old/sys.options/planner.store.parquet.rowgroup.filter.pushdown.threshold.sys.drill b/exec/java-exec/src/test/resources/options/store/local/old/sys.options/planner.store.parquet.rowgroup.filter.pushdown.threshold.sys.drill
new file mode 100644
index 0000000..8d89a59
--- /dev/null
+++ b/exec/java-exec/src/test/resources/options/store/local/old/sys.options/planner.store.parquet.rowgroup.filter.pushdown.threshold.sys.drill
@@ -0,0 +1,6 @@
+{
+  "kind" : "LONG",
+  "type" : "SYSTEM",
+  "name" : "planner.store.parquet.rowgroup.filter.pushdown.threshold",
+  "num_val" : 30000
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/8a8bf63f/exec/java-exec/src/test/resources/options/store/local/old/sys.options/planner.width.max_per_query.sys.drill
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/options/store/local/old/sys.options/planner.width.max_per_query.sys.drill b/exec/java-exec/src/test/resources/options/store/local/old/sys.options/planner.width.max_per_query.sys.drill
new file mode 100644
index 0000000..186d736
--- /dev/null
+++ b/exec/java-exec/src/test/resources/options/store/local/old/sys.options/planner.width.max_per_query.sys.drill
@@ -0,0 +1,6 @@
+{
+  "kind" : "LONG",
+  "type" : "SYSTEM",
+  "name" : "planner.width.max_per_query",
+  "num_val" : 3333
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/8a8bf63f/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 4f8ec67..e64c282 100644
--- a/pom.xml
+++ b/pom.xml
@@ -232,6 +232,7 @@
             <exclude>**/*.httpd</exclude>
             <exclude>**/*.autotools</exclude>
             <exclude>**/*.cproject</exclude>
+            <exclude>**/*.drill</exclude>
             <!-- TODO DRILL-4336: try to avoid the need to add this -->
             <exclude>dependency-reduced-pom.xml</exclude>
           </excludes>