You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2019/03/13 17:23:52 UTC

[spark] branch master updated: [SPARK-27106][SQL] merge CaseInsensitiveStringMap and DataSourceOptions

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a80a4c  [SPARK-27106][SQL] merge CaseInsensitiveStringMap and DataSourceOptions
2a80a4c is described below

commit 2a80a4cd39c7bcee44b6f6432769ca9fdba137e4
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Thu Mar 14 01:23:27 2019 +0800

    [SPARK-27106][SQL] merge CaseInsensitiveStringMap and DataSourceOptions
    
    ## What changes were proposed in this pull request?
    
    It's a little awkward to have 2 different classes(`CaseInsensitiveStringMap` and `DataSourceOptions`) to present the options in data source and catalog API.
    
    This PR merges these 2 classes, while keeping the name `CaseInsensitiveStringMap`, which is more precise.
    
    ## How was this patch tested?
    
    existing tests
    
    Closes #24025 from cloud-fan/option.
    
    Authored-by: Wenchen Fan <we...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../spark/sql/kafka010/KafkaContinuousStream.scala |   3 +-
 .../spark/sql/kafka010/KafkaMicroBatchStream.scala |   7 +-
 .../sql/kafka010/KafkaOffsetRangeCalculator.scala  |   6 +-
 .../spark/sql/kafka010/KafkaSourceProvider.scala   |  19 +-
 .../sql/kafka010/KafkaMicroBatchSourceSuite.scala  |   4 +-
 .../kafka010/KafkaOffsetRangeCalculatorSuite.scala |   8 +-
 .../spark/sql/util/CaseInsensitiveStringMap.java   |  66 ++++++-
 .../sql/util/CaseInsensitiveStringMapSuite.java    |  48 -----
 .../sql/util/CaseInsensitiveStringMapSuite.scala}  |  62 ++----
 .../spark/sql/sources/v2/DataSourceOptions.java    | 210 ---------------------
 .../spark/sql/sources/v2/SupportsBatchRead.java    |   5 +-
 .../spark/sql/sources/v2/SupportsBatchWrite.java   |   5 +-
 .../sql/sources/v2/SupportsContinuousRead.java     |   5 +-
 .../sql/sources/v2/SupportsMicroBatchRead.java     |   5 +-
 .../apache/spark/sql/sources/v2/SupportsRead.java  |   7 +-
 .../sql/sources/v2/SupportsStreamingWrite.java     |   5 +-
 .../apache/spark/sql/sources/v2/SupportsWrite.java |   5 +-
 .../apache/spark/sql/sources/v2/TableProvider.java |   5 +-
 .../org/apache/spark/sql/DataFrameReader.scala     |  18 +-
 .../org/apache/spark/sql/DataFrameWriter.scala     |   8 +-
 .../datasources/FallbackOrcDataSourceV2.scala      |  13 +-
 .../datasources/noop/NoopDataSource.scala          |   5 +-
 .../datasources/v2/DataSourceV2Implicits.scala     |   8 +-
 .../datasources/v2/DataSourceV2Relation.scala      |   7 +-
 .../datasources/v2/DataSourceV2Strategy.scala      |   8 +-
 .../datasources/v2/FileDataSourceV2.scala          |  12 ++
 .../sql/execution/datasources/v2/FileTable.scala   |  22 ++-
 .../datasources/v2/FileWriteBuilder.scala          |  15 +-
 .../datasources/v2/WriteToDataSourceV2Exec.scala   |  18 +-
 .../datasources/v2/orc/OrcDataSourceV2.scala       |  21 ++-
 .../datasources/v2/orc/OrcScanBuilder.scala        |   7 +-
 .../execution/datasources/v2/orc/OrcTable.scala    |  14 +-
 .../datasources/v2/orc/OrcWriteBuilder.scala       |   6 +-
 .../execution/streaming/MicroBatchExecution.scala  |   3 +-
 .../sql/execution/streaming/StreamExecution.scala  |   5 +-
 .../execution/streaming/StreamingRelation.scala    |   3 +-
 .../spark/sql/execution/streaming/console.scala    |   5 +-
 .../streaming/continuous/ContinuousExecution.scala |   6 +-
 .../continuous/ContinuousRateStreamSource.scala    |   6 +-
 .../continuous/ContinuousTextSocketSource.scala    |   4 +-
 .../spark/sql/execution/streaming/memory.scala     |   7 +-
 .../execution/streaming/sources/ConsoleWrite.scala |   4 +-
 .../streaming/sources/ForeachWriterTable.scala     |   5 +-
 .../sources/RateStreamMicroBatchStream.scala       |   6 +-
 .../streaming/sources/RateStreamProvider.scala     |   9 +-
 .../sources/TextSocketMicroBatchStream.scala       |   4 +-
 .../sources/TextSocketSourceProvider.scala         |  17 +-
 .../sql/execution/streaming/sources/memoryV2.scala |   5 +-
 .../spark/sql/streaming/DataStreamReader.scala     |   5 +-
 .../spark/sql/streaming/DataStreamWriter.scala     |   5 +-
 .../sql/sources/v2/JavaAdvancedDataSourceV2.java   |   6 +-
 .../sql/sources/v2/JavaColumnarDataSourceV2.java   |   6 +-
 .../sources/v2/JavaPartitionAwareDataSource.java   |   6 +-
 .../sources/v2/JavaReportStatisticsDataSource.java |   6 +-
 .../sources/v2/JavaSchemaRequiredDataSource.java   |   8 +-
 .../sql/sources/v2/JavaSimpleDataSourceV2.java     |   6 +-
 .../execution/datasources/orc/OrcFilterSuite.scala |   5 +-
 .../sources/RateStreamProviderSuite.scala          |  11 +-
 .../streaming/sources/TextSocketStreamSuite.scala  |  18 +-
 .../spark/sql/sources/v2/DataSourceV2Suite.scala   |  35 ++--
 .../sources/v2/FileDataSourceV2FallBackSuite.scala |  11 +-
 .../sql/sources/v2/SimpleWritableDataSource.scala  |  14 +-
 .../sources/StreamingDataSourceV2Suite.scala       |  33 ++--
 63 files changed, 363 insertions(+), 558 deletions(-)

diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala
index 0e61717..d60ee1c 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala
@@ -37,8 +37,7 @@ import org.apache.spark.sql.sources.v2.reader.streaming._
  * @param offsetReader  a reader used to get kafka offsets. Note that the actual data will be
  *                      read by per-task consumers generated later.
  * @param kafkaParams   String params for per-task Kafka consumers.
- * @param sourceOptions The [[org.apache.spark.sql.sources.v2.DataSourceOptions]] params which
- *                      are not Kafka consumer params.
+ * @param sourceOptions Params which are not Kafka consumer params.
  * @param metadataPath Path to a directory this reader can use for writing metadata.
  * @param initialOffsets The Kafka offsets to start reading data at.
  * @param failOnDataLoss Flag indicating whether reading should fail in data loss
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
index a630346..6972f39 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
@@ -33,9 +33,9 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset}
 import org.apache.spark.sql.execution.streaming.sources.RateControlMicroBatchStream
 import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
-import org.apache.spark.sql.sources.v2.DataSourceOptions
 import org.apache.spark.sql.sources.v2.reader._
 import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchStream, Offset}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.util.UninterruptibleThread
 
 /**
@@ -57,7 +57,7 @@ import org.apache.spark.util.UninterruptibleThread
 private[kafka010] class KafkaMicroBatchStream(
     kafkaOffsetReader: KafkaOffsetReader,
     executorKafkaParams: ju.Map[String, Object],
-    options: DataSourceOptions,
+    options: CaseInsensitiveStringMap,
     metadataPath: String,
     startingOffsets: KafkaOffsetRangeLimit,
     failOnDataLoss: Boolean) extends RateControlMicroBatchStream with Logging {
@@ -66,8 +66,7 @@ private[kafka010] class KafkaMicroBatchStream(
     "kafkaConsumer.pollTimeoutMs",
     SparkEnv.get.conf.getTimeAsSeconds("spark.network.timeout", "120s") * 1000L)
 
-  private val maxOffsetsPerTrigger =
-    Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong)
+  private val maxOffsetsPerTrigger = Option(options.get("maxOffsetsPerTrigger")).map(_.toLong)
 
   private val rangeCalculator = KafkaOffsetRangeCalculator(options)
 
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
index 6008794..1af8404 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.kafka010
 
 import org.apache.kafka.common.TopicPartition
 
-import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 
 /**
@@ -91,8 +91,8 @@ private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Option[Int
 
 private[kafka010] object KafkaOffsetRangeCalculator {
 
-  def apply(options: DataSourceOptions): KafkaOffsetRangeCalculator = {
-    val optionalValue = Option(options.get("minPartitions").orElse(null)).map(_.toInt)
+  def apply(options: CaseInsensitiveStringMap): KafkaOffsetRangeCalculator = {
+    val optionalValue = Option(options.get("minPartitions")).map(_.toInt)
     new KafkaOffsetRangeCalculator(optionalValue)
   }
 }
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index b39e0d4..8496cbd 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -38,6 +38,7 @@ import org.apache.spark.sql.sources.v2.writer.WriteBuilder
 import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 /**
  * The provider class for all Kafka readers and writers. It is designed such that it throws
@@ -103,8 +104,8 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
       failOnDataLoss(caseInsensitiveParams))
   }
 
-  override def getTable(options: DataSourceOptions): KafkaTable = {
-    new KafkaTable(strategy(options.asMap().asScala.toMap))
+  override def getTable(options: CaseInsensitiveStringMap): KafkaTable = {
+    new KafkaTable(strategy(options.asScala.toMap))
   }
 
   /**
@@ -358,11 +359,11 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
 
     override def schema(): StructType = KafkaOffsetReader.kafkaSchema
 
-    override def newScanBuilder(options: DataSourceOptions): ScanBuilder = new ScanBuilder {
+    override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = new ScanBuilder {
       override def build(): Scan = new KafkaScan(options)
     }
 
-    override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = {
+    override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = {
       new WriteBuilder {
         private var inputSchema: StructType = _
 
@@ -375,20 +376,20 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
           import scala.collection.JavaConverters._
 
           assert(inputSchema != null)
-          val topic = Option(options.get(TOPIC_OPTION_KEY).orElse(null)).map(_.trim)
-          val producerParams = kafkaParamsForProducer(options.asMap.asScala.toMap)
+          val topic = Option(options.get(TOPIC_OPTION_KEY)).map(_.trim)
+          val producerParams = kafkaParamsForProducer(options.asScala.toMap)
           new KafkaStreamingWrite(topic, producerParams, inputSchema)
         }
       }
     }
   }
 
-  class KafkaScan(options: DataSourceOptions) extends Scan {
+  class KafkaScan(options: CaseInsensitiveStringMap) extends Scan {
 
     override def readSchema(): StructType = KafkaOffsetReader.kafkaSchema
 
     override def toMicroBatchStream(checkpointLocation: String): MicroBatchStream = {
-      val parameters = options.asMap().asScala.toMap
+      val parameters = options.asScala.toMap
       validateStreamOptions(parameters)
       // Each running query should use its own group id. Otherwise, the query may be only assigned
       // partial data since Kafka will assign partitions to multiple consumers having the same group
@@ -417,7 +418,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
     }
 
     override def toContinuousStream(checkpointLocation: String): ContinuousStream = {
-      val parameters = options.asMap().asScala.toMap
+      val parameters = options.asScala.toMap
       validateStreamOptions(parameters)
       // Each running query should use its own group id. Otherwise, the query may be only assigned
       // partial data since Kafka will assign partitions to multiple consumers having the same group
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index 8fd5790..21634ae 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -41,10 +41,10 @@ import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
 import org.apache.spark.sql.functions.{count, window}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.kafka010.KafkaSourceProvider._
-import org.apache.spark.sql.sources.v2.DataSourceOptions
 import org.apache.spark.sql.streaming.{StreamTest, Trigger}
 import org.apache.spark.sql.streaming.util.StreamManualClock
 import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 abstract class KafkaSourceTest extends StreamTest with SharedSQLContext with KafkaTest {
 
@@ -1118,7 +1118,7 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase {
           "kafka.bootstrap.servers" -> testUtils.brokerAddress,
           "subscribe" -> topic
         ) ++ Option(minPartitions).map { p => "minPartitions" -> p}
-        val dsOptions = new DataSourceOptions(options.asJava)
+        val dsOptions = new CaseInsensitiveStringMap(options.asJava)
         val table = provider.getTable(dsOptions)
         val stream = table.newScanBuilder(dsOptions).build().toMicroBatchStream(dir.getAbsolutePath)
         val inputPartitions = stream.planInputPartitions(
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala
index 2ccf3e2..7ffdaab 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala
@@ -22,13 +22,13 @@ import scala.collection.JavaConverters._
 import org.apache.kafka.common.TopicPartition
 
 import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 class KafkaOffsetRangeCalculatorSuite extends SparkFunSuite {
 
   def testWithMinPartitions(name: String, minPartition: Int)
       (f: KafkaOffsetRangeCalculator => Unit): Unit = {
-    val options = new DataSourceOptions(Map("minPartitions" -> minPartition.toString).asJava)
+    val options = new CaseInsensitiveStringMap(Map("minPartitions" -> minPartition.toString).asJava)
     test(s"with minPartition = $minPartition: $name") {
       f(KafkaOffsetRangeCalculator(options))
     }
@@ -36,7 +36,7 @@ class KafkaOffsetRangeCalculatorSuite extends SparkFunSuite {
 
 
   test("with no minPartition: N TopicPartitions to N offset ranges") {
-    val calc = KafkaOffsetRangeCalculator(DataSourceOptions.empty())
+    val calc = KafkaOffsetRangeCalculator(CaseInsensitiveStringMap.empty())
     assert(
       calc.getRanges(
         fromOffsets = Map(tp1 -> 1),
@@ -64,7 +64,7 @@ class KafkaOffsetRangeCalculatorSuite extends SparkFunSuite {
   }
 
   test("with no minPartition: empty ranges ignored") {
-    val calc = KafkaOffsetRangeCalculator(DataSourceOptions.empty())
+    val calc = KafkaOffsetRangeCalculator(CaseInsensitiveStringMap.empty())
     assert(
       calc.getRanges(
         fromOffsets = Map(tp1 -> 1, tp2 -> 1),
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java b/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java
index 8c5a6c6..704d90e 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java
@@ -31,19 +31,20 @@ import java.util.Set;
  * This is used to pass options to v2 implementations to ensure consistent case insensitivity.
  * <p>
  * Methods that return keys in this map, like {@link #entrySet()} and {@link #keySet()}, return
- * keys converted to lower case.
+ * keys converted to lower case. This map doesn't allow null key.
  */
 @Experimental
 public class CaseInsensitiveStringMap implements Map<String, String> {
 
   public static CaseInsensitiveStringMap empty() {
-    return new CaseInsensitiveStringMap();
+    return new CaseInsensitiveStringMap(new HashMap<>(0));
   }
 
   private final Map<String, String> delegate;
 
-  private CaseInsensitiveStringMap() {
-    this.delegate = new HashMap<>();
+  public CaseInsensitiveStringMap(Map<String, String> originalMap) {
+    this.delegate = new HashMap<>(originalMap.size());
+    putAll(originalMap);
   }
 
   @Override
@@ -56,9 +57,13 @@ public class CaseInsensitiveStringMap implements Map<String, String> {
     return delegate.isEmpty();
   }
 
+  private String toLowerCase(Object key) {
+    return key.toString().toLowerCase(Locale.ROOT);
+  }
+
   @Override
   public boolean containsKey(Object key) {
-    return delegate.containsKey(key.toString().toLowerCase(Locale.ROOT));
+    return delegate.containsKey(toLowerCase(key));
   }
 
   @Override
@@ -68,17 +73,17 @@ public class CaseInsensitiveStringMap implements Map<String, String> {
 
   @Override
   public String get(Object key) {
-    return delegate.get(key.toString().toLowerCase(Locale.ROOT));
+    return delegate.get(toLowerCase(key));
   }
 
   @Override
   public String put(String key, String value) {
-    return delegate.put(key.toLowerCase(Locale.ROOT), value);
+    return delegate.put(toLowerCase(key), value);
   }
 
   @Override
   public String remove(Object key) {
-    return delegate.remove(key.toString().toLowerCase(Locale.ROOT));
+    return delegate.remove(toLowerCase(key));
   }
 
   @Override
@@ -107,4 +112,49 @@ public class CaseInsensitiveStringMap implements Map<String, String> {
   public Set<Map.Entry<String, String>> entrySet() {
     return delegate.entrySet();
   }
+
+  /**
+   * Returns the boolean value to which the specified key is mapped,
+   * or defaultValue if there is no mapping for the key. The key match is case-insensitive.
+   */
+  public boolean getBoolean(String key, boolean defaultValue) {
+    String value = get(key);
+    // We can't use `Boolean.parseBoolean` here, as it returns false for invalid strings.
+    if (value == null) {
+      return defaultValue;
+    } else if (value.equalsIgnoreCase("true")) {
+      return true;
+    } else if (value.equalsIgnoreCase("false")) {
+      return false;
+    } else {
+      throw new IllegalArgumentException(value + " is not a boolean string.");
+    }
+  }
+
+  /**
+   * Returns the integer value to which the specified key is mapped,
+   * or defaultValue if there is no mapping for the key. The key match is case-insensitive.
+   */
+  public int getInt(String key, int defaultValue) {
+    String value = get(key);
+    return value == null ? defaultValue : Integer.parseInt(value);
+  }
+
+  /**
+   * Returns the long value to which the specified key is mapped,
+   * or defaultValue if there is no mapping for the key. The key match is case-insensitive.
+   */
+  public long getLong(String key, long defaultValue) {
+    String value = get(key);
+    return value == null ? defaultValue : Long.parseLong(value);
+  }
+
+  /**
+   * Returns the double value to which the specified key is mapped,
+   * or defaultValue if there is no mapping for the key. The key match is case-insensitive.
+   */
+  public double getDouble(String key, double defaultValue) {
+    String value = get(key);
+    return value == null ? defaultValue : Double.parseDouble(value);
+  }
 }
diff --git a/sql/catalyst/src/test/java/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.java b/sql/catalyst/src/test/java/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.java
deleted file mode 100644
index 7639277..0000000
--- a/sql/catalyst/src/test/java/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.util;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.HashSet;
-import java.util.Set;
-
-public class CaseInsensitiveStringMapSuite {
-  @Test
-  public void testPutAndGet() {
-    CaseInsensitiveStringMap options = CaseInsensitiveStringMap.empty();
-    options.put("kEy", "valUE");
-
-    Assert.assertEquals("Should return correct value for lower-case key",
-        "valUE", options.get("key"));
-    Assert.assertEquals("Should return correct value for upper-case key",
-        "valUE", options.get("KEY"));
-  }
-
-  @Test
-  public void testKeySet() {
-    CaseInsensitiveStringMap options = CaseInsensitiveStringMap.empty();
-    options.put("kEy", "valUE");
-
-    Set<String> expectedKeySet = new HashSet<>();
-    expectedKeySet.add("key");
-
-    Assert.assertEquals("Should return lower-case key set", expectedKeySet, options.keySet());
-  }
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceOptionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.scala
similarity index 52%
rename from sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceOptionsSuite.scala
rename to sql/catalyst/src/test/scala/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.scala
index cfa69a8..623ddeb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceOptionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.scala
@@ -15,31 +15,29 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.sources.v2
+package org.apache.spark.sql.util
 
 import scala.collection.JavaConverters._
 
 import org.apache.spark.SparkFunSuite
 
-/**
- * A simple test suite to verify `DataSourceOptions`.
- */
-class DataSourceOptionsSuite extends SparkFunSuite {
+class CaseInsensitiveStringMapSuite extends SparkFunSuite {
 
-  test("key is case-insensitive") {
-    val options = new DataSourceOptions(Map("foo" -> "bar").asJava)
-    assert(options.get("foo").get() == "bar")
-    assert(options.get("FoO").get() == "bar")
-    assert(!options.get("abc").isPresent)
+  test("put and get") {
+    val options = CaseInsensitiveStringMap.empty()
+    options.put("kEy", "valUE")
+    assert(options.get("key") == "valUE")
+    assert(options.get("KEY") == "valUE")
   }
 
-  test("value is case-sensitive") {
-    val options = new DataSourceOptions(Map("foo" -> "bAr").asJava)
-    assert(options.get("foo").get == "bAr")
+  test("key and value set") {
+    val options = new CaseInsensitiveStringMap(Map("kEy" -> "valUE").asJava)
+    assert(options.keySet().asScala == Set("key"))
+    assert(options.values().asScala.toSeq == Seq("valUE"))
   }
 
   test("getInt") {
-    val options = new DataSourceOptions(Map("numFOo" -> "1", "foo" -> "bar").asJava)
+    val options = new CaseInsensitiveStringMap(Map("numFOo" -> "1", "foo" -> "bar").asJava)
     assert(options.getInt("numFOO", 10) == 1)
     assert(options.getInt("numFOO2", 10) == 10)
 
@@ -49,17 +47,20 @@ class DataSourceOptionsSuite extends SparkFunSuite {
   }
 
   test("getBoolean") {
-    val options = new DataSourceOptions(
+    val options = new CaseInsensitiveStringMap(
       Map("isFoo" -> "true", "isFOO2" -> "false", "foo" -> "bar").asJava)
     assert(options.getBoolean("isFoo", false))
     assert(!options.getBoolean("isFoo2", true))
     assert(options.getBoolean("isBar", true))
     assert(!options.getBoolean("isBar", false))
-    assert(!options.getBoolean("FOO", true))
+
+    intercept[IllegalArgumentException] {
+      options.getBoolean("FOO", true)
+    }
   }
 
   test("getLong") {
-    val options = new DataSourceOptions(Map("numFoo" -> "9223372036854775807",
+    val options = new CaseInsensitiveStringMap(Map("numFoo" -> "9223372036854775807",
       "foo" -> "bar").asJava)
     assert(options.getLong("numFOO", 0L) == 9223372036854775807L)
     assert(options.getLong("numFoo2", -1L) == -1L)
@@ -70,7 +71,7 @@ class DataSourceOptionsSuite extends SparkFunSuite {
   }
 
   test("getDouble") {
-    val options = new DataSourceOptions(Map("numFoo" -> "922337.1",
+    val options = new CaseInsensitiveStringMap(Map("numFoo" -> "922337.1",
       "foo" -> "bar").asJava)
     assert(options.getDouble("numFOO", 0d) == 922337.1d)
     assert(options.getDouble("numFoo2", -1.02d) == -1.02d)
@@ -79,29 +80,4 @@ class DataSourceOptionsSuite extends SparkFunSuite {
       options.getDouble("foo", 0.1d)
     }
   }
-
-  test("standard options") {
-    val options = new DataSourceOptions(Map(
-      DataSourceOptions.PATH_KEY -> "abc",
-      DataSourceOptions.TABLE_KEY -> "tbl").asJava)
-
-    assert(options.paths().toSeq == Seq("abc"))
-    assert(options.tableName().get() == "tbl")
-    assert(!options.databaseName().isPresent)
-  }
-
-  test("standard options with both singular path and multi-paths") {
-    val options = new DataSourceOptions(Map(
-      DataSourceOptions.PATH_KEY -> "abc",
-      DataSourceOptions.PATHS_KEY -> """["c", "d"]""").asJava)
-
-    assert(options.paths().toSeq == Seq("abc", "c", "d"))
-  }
-
-  test("standard options with only multi-paths") {
-    val options = new DataSourceOptions(Map(
-      DataSourceOptions.PATHS_KEY -> """["c", "d\"e"]""").asJava)
-
-    assert(options.paths().toSeq == Seq("c", "d\"e"))
-  }
 }
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceOptions.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceOptions.java
deleted file mode 100644
index 00af0bf..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceOptions.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Optional;
-import java.util.stream.Stream;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import org.apache.spark.annotation.Evolving;
-
-/**
- * An immutable string-to-string map in which keys are case-insensitive. This is used to represent
- * data source options.
- *
- * Each data source implementation can define its own options and teach its users how to set them.
- * Spark doesn't have any restrictions about what options a data source should or should not have.
- * Instead Spark defines some standard options that data sources can optionally adopt. It's possible
- * that some options are very common and many data sources use them. However different data
- * sources may define the common options(key and meaning) differently, which is quite confusing to
- * end users.
- *
- * The standard options defined by Spark:
- * <table summary="standard data source options">
- *   <tr>
- *     <th><b>Option key</b></th>
- *     <th><b>Option value</b></th>
- *   </tr>
- *   <tr>
- *     <td>path</td>
- *     <td>A path string of the data files/directories, like
- *     <code>path1</code>, <code>/absolute/file2</code>, <code>path3/*</code>. The path can
- *     either be relative or absolute, points to either file or directory, and can contain
- *     wildcards. This option is commonly used by file-based data sources.</td>
- *   </tr>
- *   <tr>
- *     <td>paths</td>
- *     <td>A JSON array style paths string of the data files/directories, like
- *     <code>["path1", "/absolute/file2"]</code>. The format of each path is same as the
- *     <code>path</code> option, plus it should follow JSON string literal format, e.g. quotes
- *     should be escaped, <code>pa\"th</code> means pa"th.
- *     </td>
- *   </tr>
- *   <tr>
- *     <td>table</td>
- *     <td>A table name string representing the table name directly without any interpretation.
- *     For example, <code>db.tbl</code> means a table called db.tbl, not a table called tbl
- *     inside database db. <code>`t*b.l`</code> means a table called `t*b.l`, not t*b.l.</td>
- *   </tr>
- *   <tr>
- *     <td>database</td>
- *     <td>A database name string representing the database name directly without any
- *     interpretation, which is very similar to the table name option.</td>
- *   </tr>
- * </table>
- */
-@Evolving
-public class DataSourceOptions {
-  private final Map<String, String> keyLowerCasedMap;
-
-  private String toLowerCase(String key) {
-    return key.toLowerCase(Locale.ROOT);
-  }
-
-  public static DataSourceOptions empty() {
-    return new DataSourceOptions(new HashMap<>());
-  }
-
-  public DataSourceOptions(Map<String, String> originalMap) {
-    keyLowerCasedMap = new HashMap<>(originalMap.size());
-    for (Map.Entry<String, String> entry : originalMap.entrySet()) {
-      keyLowerCasedMap.put(toLowerCase(entry.getKey()), entry.getValue());
-    }
-  }
-
-  public Map<String, String> asMap() {
-    return new HashMap<>(keyLowerCasedMap);
-  }
-
-  /**
-   * Returns the option value to which the specified key is mapped, case-insensitively.
-   */
-  public Optional<String> get(String key) {
-    return Optional.ofNullable(keyLowerCasedMap.get(toLowerCase(key)));
-  }
-
-  /**
-   * Returns the boolean value to which the specified key is mapped,
-   * or defaultValue if there is no mapping for the key. The key match is case-insensitive
-   */
-  public boolean getBoolean(String key, boolean defaultValue) {
-    String lcaseKey = toLowerCase(key);
-    return keyLowerCasedMap.containsKey(lcaseKey) ?
-      Boolean.parseBoolean(keyLowerCasedMap.get(lcaseKey)) : defaultValue;
-  }
-
-  /**
-   * Returns the integer value to which the specified key is mapped,
-   * or defaultValue if there is no mapping for the key. The key match is case-insensitive
-   */
-  public int getInt(String key, int defaultValue) {
-    String lcaseKey = toLowerCase(key);
-    return keyLowerCasedMap.containsKey(lcaseKey) ?
-      Integer.parseInt(keyLowerCasedMap.get(lcaseKey)) : defaultValue;
-  }
-
-  /**
-   * Returns the long value to which the specified key is mapped,
-   * or defaultValue if there is no mapping for the key. The key match is case-insensitive
-   */
-  public long getLong(String key, long defaultValue) {
-    String lcaseKey = toLowerCase(key);
-    return keyLowerCasedMap.containsKey(lcaseKey) ?
-      Long.parseLong(keyLowerCasedMap.get(lcaseKey)) : defaultValue;
-  }
-
-  /**
-   * Returns the double value to which the specified key is mapped,
-   * or defaultValue if there is no mapping for the key. The key match is case-insensitive
-   */
-  public double getDouble(String key, double defaultValue) {
-    String lcaseKey = toLowerCase(key);
-    return keyLowerCasedMap.containsKey(lcaseKey) ?
-      Double.parseDouble(keyLowerCasedMap.get(lcaseKey)) : defaultValue;
-  }
-
-  /**
-   * The option key for singular path.
-   */
-  public static final String PATH_KEY = "path";
-
-  /**
-   * The option key for multiple paths.
-   */
-  public static final String PATHS_KEY = "paths";
-
-  /**
-   * The option key for table name.
-   */
-  public static final String TABLE_KEY = "table";
-
-  /**
-   * The option key for database name.
-   */
-  public static final String DATABASE_KEY = "database";
-
-  /**
-   * The option key for whether to check existence of files for a table.
-   */
-  public static final String CHECK_FILES_EXIST_KEY = "check_files_exist";
-
-  /**
-   * Returns all the paths specified by both the singular path option and the multiple
-   * paths option.
-   */
-  public String[] paths() {
-    String[] singularPath =
-      get(PATH_KEY).map(s -> new String[]{s}).orElseGet(() -> new String[0]);
-    Optional<String> pathsStr = get(PATHS_KEY);
-    if (pathsStr.isPresent()) {
-      ObjectMapper objectMapper = new ObjectMapper();
-      try {
-        String[] paths = objectMapper.readValue(pathsStr.get(), String[].class);
-        return Stream.of(singularPath, paths).flatMap(Stream::of).toArray(String[]::new);
-      } catch (IOException e) {
-        return singularPath;
-      }
-    } else {
-      return singularPath;
-    }
-  }
-
-  /**
-   * Returns the value of the table name option.
-   */
-  public Optional<String> tableName() {
-    return get(TABLE_KEY);
-  }
-
-  /**
-   * Returns the value of the database name option.
-   */
-  public Optional<String> databaseName() {
-    return get(DATABASE_KEY);
-  }
-
-  public Boolean checkFilesExist() {
-    Optional<String> result = get(CHECK_FILES_EXIST_KEY);
-    return result.isPresent() && result.get().equals("true");
-  }
-}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchRead.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchRead.java
index 6c5a95d..ea7c5d2 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchRead.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchRead.java
@@ -20,13 +20,14 @@ package org.apache.spark.sql.sources.v2;
 import org.apache.spark.annotation.Evolving;
 import org.apache.spark.sql.sources.v2.reader.Scan;
 import org.apache.spark.sql.sources.v2.reader.ScanBuilder;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 
 /**
  * An empty mix-in interface for {@link Table}, to indicate this table supports batch scan.
  * <p>
  * If a {@link Table} implements this interface, the
- * {@link SupportsRead#newScanBuilder(DataSourceOptions)} must return a {@link ScanBuilder} that
- * builds {@link Scan} with {@link Scan#toBatch()} implemented.
+ * {@link SupportsRead#newScanBuilder(CaseInsensitiveStringMap)} must return a {@link ScanBuilder}
+ * that builds {@link Scan} with {@link Scan#toBatch()} implemented.
  * </p>
  */
 @Evolving
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java
index b2cd97a..09e23f8 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java
@@ -19,13 +19,14 @@ package org.apache.spark.sql.sources.v2;
 
 import org.apache.spark.annotation.Evolving;
 import org.apache.spark.sql.sources.v2.writer.WriteBuilder;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 
 /**
  * An empty mix-in interface for {@link Table}, to indicate this table supports batch write.
  * <p>
  * If a {@link Table} implements this interface, the
- * {@link SupportsWrite#newWriteBuilder(DataSourceOptions)} must return a {@link WriteBuilder}
- * with {@link WriteBuilder#buildForBatch()} implemented.
+ * {@link SupportsWrite#newWriteBuilder(CaseInsensitiveStringMap)} must return a
+ * {@link WriteBuilder} with {@link WriteBuilder#buildForBatch()} implemented.
  * </p>
  */
 @Evolving
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsContinuousRead.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsContinuousRead.java
index b7fa3f2..5cc9848 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsContinuousRead.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsContinuousRead.java
@@ -20,14 +20,15 @@ package org.apache.spark.sql.sources.v2;
 import org.apache.spark.annotation.Evolving;
 import org.apache.spark.sql.sources.v2.reader.Scan;
 import org.apache.spark.sql.sources.v2.reader.ScanBuilder;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 
 /**
  * An empty mix-in interface for {@link Table}, to indicate this table supports streaming scan with
  * continuous mode.
  * <p>
  * If a {@link Table} implements this interface, the
- * {@link SupportsRead#newScanBuilder(DataSourceOptions)} must return a {@link ScanBuilder} that
- * builds {@link Scan} with {@link Scan#toContinuousStream(String)} implemented.
+ * {@link SupportsRead#newScanBuilder(CaseInsensitiveStringMap)} must return a {@link ScanBuilder}
+ * that builds {@link Scan} with {@link Scan#toContinuousStream(String)} implemented.
  * </p>
  */
 @Evolving
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsMicroBatchRead.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsMicroBatchRead.java
index 9408e32..c98f3f1 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsMicroBatchRead.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsMicroBatchRead.java
@@ -20,14 +20,15 @@ package org.apache.spark.sql.sources.v2;
 import org.apache.spark.annotation.Evolving;
 import org.apache.spark.sql.sources.v2.reader.Scan;
 import org.apache.spark.sql.sources.v2.reader.ScanBuilder;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 
 /**
  * An empty mix-in interface for {@link Table}, to indicate this table supports streaming scan with
  * micro-batch mode.
  * <p>
  * If a {@link Table} implements this interface, the
- * {@link SupportsRead#newScanBuilder(DataSourceOptions)} must return a {@link ScanBuilder} that
- * builds {@link Scan} with {@link Scan#toMicroBatchStream(String)} implemented.
+ * {@link SupportsRead#newScanBuilder(CaseInsensitiveStringMap)} must return a {@link ScanBuilder}
+ * that builds {@link Scan} with {@link Scan#toMicroBatchStream(String)} implemented.
  * </p>
  */
 @Evolving
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsRead.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsRead.java
index 5031c71..14990ef 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsRead.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsRead.java
@@ -19,11 +19,12 @@ package org.apache.spark.sql.sources.v2;
 
 import org.apache.spark.sql.sources.v2.reader.Scan;
 import org.apache.spark.sql.sources.v2.reader.ScanBuilder;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 
 /**
  * An internal base interface of mix-in interfaces for readable {@link Table}. This adds
- * {@link #newScanBuilder(DataSourceOptions)} that is used to create a scan for batch, micro-batch,
- * or continuous processing.
+ * {@link #newScanBuilder(CaseInsensitiveStringMap)} that is used to create a scan for batch,
+ * micro-batch, or continuous processing.
  */
 interface SupportsRead extends Table {
 
@@ -34,5 +35,5 @@ interface SupportsRead extends Table {
    * @param options The options for reading, which is an immutable case-insensitive
    *                string-to-string map.
    */
-  ScanBuilder newScanBuilder(DataSourceOptions options);
+  ScanBuilder newScanBuilder(CaseInsensitiveStringMap options);
 }
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsStreamingWrite.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsStreamingWrite.java
index 1050d35..ac11e48 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsStreamingWrite.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsStreamingWrite.java
@@ -20,13 +20,14 @@ package org.apache.spark.sql.sources.v2;
 import org.apache.spark.annotation.Evolving;
 import org.apache.spark.sql.execution.streaming.BaseStreamingSink;
 import org.apache.spark.sql.sources.v2.writer.WriteBuilder;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 
 /**
  * An empty mix-in interface for {@link Table}, to indicate this table supports streaming write.
  * <p>
  * If a {@link Table} implements this interface, the
- * {@link SupportsWrite#newWriteBuilder(DataSourceOptions)} must return a {@link WriteBuilder}
- * with {@link WriteBuilder#buildForStreaming()} implemented.
+ * {@link SupportsWrite#newWriteBuilder(CaseInsensitiveStringMap)} must return a
+ * {@link WriteBuilder} with {@link WriteBuilder#buildForStreaming()} implemented.
  * </p>
  */
 @Evolving
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsWrite.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsWrite.java
index ecdfe20..f0d8e44 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsWrite.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsWrite.java
@@ -19,10 +19,11 @@ package org.apache.spark.sql.sources.v2;
 
 import org.apache.spark.sql.sources.v2.writer.BatchWrite;
 import org.apache.spark.sql.sources.v2.writer.WriteBuilder;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 
 /**
  * An internal base interface of mix-in interfaces for writable {@link Table}. This adds
- * {@link #newWriteBuilder(DataSourceOptions)} that is used to create a write
+ * {@link #newWriteBuilder(CaseInsensitiveStringMap)} that is used to create a write
  * for batch or streaming.
  */
 interface SupportsWrite extends Table {
@@ -31,5 +32,5 @@ interface SupportsWrite extends Table {
    * Returns a {@link WriteBuilder} which can be used to create {@link BatchWrite}. Spark will call
    * this method to configure each data source write.
    */
-  WriteBuilder newWriteBuilder(DataSourceOptions options);
+  WriteBuilder newWriteBuilder(CaseInsensitiveStringMap options);
 }
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java
index a9b83b6..04ad8fd 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java
@@ -20,6 +20,7 @@ package org.apache.spark.sql.sources.v2;
 import org.apache.spark.annotation.Evolving;
 import org.apache.spark.sql.sources.DataSourceRegister;
 import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 
 /**
  * The base interface for v2 data sources which don't have a real catalog. Implementations must
@@ -37,7 +38,7 @@ public interface TableProvider {
    * @param options the user-specified options that can identify a table, e.g. file path, Kafka
    *                topic name, etc. It's an immutable case-insensitive string-to-string map.
    */
-  Table getTable(DataSourceOptions options);
+  Table getTable(CaseInsensitiveStringMap options);
 
   /**
    * Return a {@link Table} instance to do read/write with user-specified schema and options.
@@ -50,7 +51,7 @@ public interface TableProvider {
    * @param schema the user-specified schema.
    * @throws UnsupportedOperationException
    */
-  default Table getTable(DataSourceOptions options, StructType schema) {
+  default Table getTable(CaseInsensitiveStringMap options, StructType schema) {
     String name;
     if (this instanceof DataSourceRegister) {
       name = ((DataSourceRegister) this).shortName();
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index a856258..2cc9370 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -40,6 +40,7 @@ import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource
 import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2Utils, FileDataSourceV2, FileTable}
 import org.apache.spark.sql.sources.v2._
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.unsafe.types.UTF8String
 
 /**
@@ -176,7 +177,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
    */
   def load(path: String): DataFrame = {
     // force invocation of `load(...varargs...)`
-    option(DataSourceOptions.PATH_KEY, path).load(Seq.empty: _*)
+    option("path", path).load(Seq.empty: _*)
   }
 
   /**
@@ -206,20 +207,23 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
       val provider = cls.getConstructor().newInstance().asInstanceOf[TableProvider]
       val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
         source = provider, conf = sparkSession.sessionState.conf)
-      val pathsOption = {
+      val pathsOption = if (paths.isEmpty) {
+        None
+      } else {
         val objectMapper = new ObjectMapper()
-        DataSourceOptions.PATHS_KEY -> objectMapper.writeValueAsString(paths.toArray)
+        Some("paths" -> objectMapper.writeValueAsString(paths.toArray))
       }
-      val checkFilesExistsOption = DataSourceOptions.CHECK_FILES_EXIST_KEY -> "true"
-      val finalOptions = sessionOptions ++ extraOptions.toMap + pathsOption + checkFilesExistsOption
-      val dsOptions = new DataSourceOptions(finalOptions.asJava)
+      // TODO SPARK-27113: remove this option.
+      val checkFilesExistsOpt = "check_files_exist" -> "true"
+      val finalOptions = sessionOptions ++ extraOptions.toMap ++ pathsOption + checkFilesExistsOpt
+      val dsOptions = new CaseInsensitiveStringMap(finalOptions.asJava)
       val table = userSpecifiedSchema match {
         case Some(schema) => provider.getTable(dsOptions, schema)
         case _ => provider.getTable(dsOptions)
       }
       table match {
         case _: SupportsBatchRead =>
-          Dataset.ofRows(sparkSession, DataSourceV2Relation.create(table, finalOptions))
+          Dataset.ofRows(sparkSession, DataSourceV2Relation.create(table, dsOptions))
 
         case _ => loadV1Source(paths: _*)
       }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 8d4d60e..e58225e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -35,6 +35,7 @@ import org.apache.spark.sql.sources.BaseRelation
 import org.apache.spark.sql.sources.v2._
 import org.apache.spark.sql.sources.v2.writer.SupportsSaveMode
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 /**
  * Interface used to write a [[Dataset]] to external storage systems (e.g. file systems,
@@ -260,12 +261,13 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
       val provider = cls.getConstructor().newInstance().asInstanceOf[TableProvider]
       val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
         provider, session.sessionState.conf)
-      val checkFilesExistsOption = DataSourceOptions.CHECK_FILES_EXIST_KEY -> "false"
+      // TODO SPARK-27113: remove this option.
+      val checkFilesExistsOption = "check_files_exist" -> "false"
       val options = sessionOptions ++ extraOptions + checkFilesExistsOption
-      val dsOptions = new DataSourceOptions(options.asJava)
+      val dsOptions = new CaseInsensitiveStringMap(options.asJava)
       provider.getTable(dsOptions) match {
         case table: SupportsBatchWrite =>
-          lazy val relation = DataSourceV2Relation.create(table, options)
+          lazy val relation = DataSourceV2Relation.create(table, dsOptions)
           mode match {
             case SaveMode.Append =>
               runCommand(df.sparkSession, "save") {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallbackOrcDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallbackOrcDataSourceV2.scala
index e22d6a6..7c72495 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallbackOrcDataSourceV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallbackOrcDataSourceV2.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution.datasources
 
+import scala.collection.JavaConverters._
+
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
 import org.apache.spark.sql.catalyst.rules.Rule
@@ -33,10 +35,15 @@ import org.apache.spark.sql.execution.datasources.v2.orc.OrcTable
  */
 class FallbackOrcDataSourceV2(sparkSession: SparkSession) extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
-    case i @ InsertIntoTable(d @DataSourceV2Relation(table: OrcTable, _, _), _, _, _, _) =>
+    case i @ InsertIntoTable(d @ DataSourceV2Relation(table: OrcTable, _, _), _, _, _, _) =>
       val v1FileFormat = new OrcFileFormat
-      val relation = HadoopFsRelation(table.fileIndex, table.fileIndex.partitionSchema,
-        table.schema(), None, v1FileFormat, d.options)(sparkSession)
+      val relation = HadoopFsRelation(
+        table.fileIndex,
+        table.fileIndex.partitionSchema,
+        table.schema(),
+        None,
+        v1FileFormat,
+        d.options.asScala.toMap)(sparkSession)
       i.copy(table = LogicalRelation(relation))
   }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala
index 22a74e3..aa2a5e9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.sources.v2._
 import org.apache.spark.sql.sources.v2.writer._
 import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite}
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 /**
  * This is no-op datasource. It does not do anything besides consuming its input.
@@ -31,11 +32,11 @@ import org.apache.spark.sql.types.StructType
  */
 class NoopDataSource extends TableProvider with DataSourceRegister {
   override def shortName(): String = "noop"
-  override def getTable(options: DataSourceOptions): Table = NoopTable
+  override def getTable(options: CaseInsensitiveStringMap): Table = NoopTable
 }
 
 private[noop] object NoopTable extends Table with SupportsBatchWrite with SupportsStreamingWrite {
-  override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = NoopWriteBuilder
+  override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = NoopWriteBuilder
   override def name(): String = "noop-table"
   override def schema(): StructType = new StructType()
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala
index c8542bf..2081af3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala
@@ -17,10 +17,8 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
-import scala.collection.JavaConverters._
-
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsBatchRead, SupportsBatchWrite, Table}
+import org.apache.spark.sql.sources.v2.{SupportsBatchRead, SupportsBatchWrite, Table}
 
 object DataSourceV2Implicits {
   implicit class TableHelper(table: Table) {
@@ -42,8 +40,4 @@ object DataSourceV2Implicits {
       }
     }
   }
-
-  implicit class OptionsHelper(options: Map[String, String]) {
-    def toDataSourceOptions: DataSourceOptions = new DataSourceOptions(options.asJava)
-  }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
index 891694b..1740782 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.sources.v2._
 import org.apache.spark.sql.sources.v2.reader.{Statistics => V2Statistics, _}
 import org.apache.spark.sql.sources.v2.reader.streaming.{Offset, SparkDataStream}
 import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 /**
  * A logical plan representing a data source v2 table.
@@ -36,7 +37,7 @@ import org.apache.spark.sql.sources.v2.writer._
 case class DataSourceV2Relation(
     table: Table,
     output: Seq[AttributeReference],
-    options: Map[String, String])
+    options: CaseInsensitiveStringMap)
   extends LeafNode with MultiInstanceRelation with NamedRelation {
 
   import DataSourceV2Implicits._
@@ -48,7 +49,7 @@ case class DataSourceV2Relation(
   }
 
   def newScanBuilder(): ScanBuilder = {
-    table.asBatchReadable.newScanBuilder(options.toDataSourceOptions)
+    table.asBatchReadable.newScanBuilder(options)
   }
 
   override def computeStats(): Statistics = {
@@ -96,7 +97,7 @@ case class StreamingDataSourceV2Relation(
 }
 
 object DataSourceV2Relation {
-  def create(table: Table, options: Map[String, String]): DataSourceV2Relation = {
+  def create(table: Table, options: CaseInsensitiveStringMap): DataSourceV2Relation = {
     val output = table.schema().toAttributes
     DataSourceV2Relation(table, output, options)
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index bf60626..424fbed 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -148,8 +148,7 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
       WriteToDataSourceV2Exec(writer, planLater(query)) :: Nil
 
     case AppendData(r: DataSourceV2Relation, query, _) =>
-      AppendDataExec(
-        r.table.asBatchWritable, r.options.toDataSourceOptions, planLater(query)) :: Nil
+      AppendDataExec(r.table.asBatchWritable, r.options, planLater(query)) :: Nil
 
     case OverwriteByExpression(r: DataSourceV2Relation, deleteExpr, query, _) =>
       // fail if any filter cannot be converted. correctness depends on removing all matching data.
@@ -159,11 +158,10 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
       }.toArray
 
       OverwriteByExpressionExec(
-        r.table.asBatchWritable, filters, r.options.toDataSourceOptions, planLater(query)) :: Nil
+        r.table.asBatchWritable, filters, r.options, planLater(query)) :: Nil
 
     case OverwritePartitionsDynamic(r: DataSourceV2Relation, query, _) =>
-      OverwritePartitionsDynamicExec(r.table.asBatchWritable,
-        r.options.toDataSourceOptions, planLater(query)) :: Nil
+      OverwritePartitionsDynamicExec(r.table.asBatchWritable, r.options, planLater(query)) :: Nil
 
     case WriteToContinuousDataSource(writer, query) =>
       WriteToContinuousDataSourceExec(writer, planLater(query)) :: Nil
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
index 06c5706..e9c7a1b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
@@ -16,10 +16,13 @@
  */
 package org.apache.spark.sql.execution.datasources.v2
 
+import com.fasterxml.jackson.databind.ObjectMapper
+
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.sources.DataSourceRegister
 import org.apache.spark.sql.sources.v2.TableProvider
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 /**
  * A base interface for data source v2 implementations of the built-in file-based data sources.
@@ -35,4 +38,13 @@ trait FileDataSourceV2 extends TableProvider with DataSourceRegister {
   def fallBackFileFormat: Class[_ <: FileFormat]
 
   lazy val sparkSession = SparkSession.active
+
+  protected def getPaths(map: CaseInsensitiveStringMap): Seq[String] = {
+    val objectMapper = new ObjectMapper()
+    Option(map.get("paths")).map { pathStr =>
+      objectMapper.readValue(pathStr, classOf[Array[String]]).toSeq
+    }.getOrElse {
+      Option(map.get("path")).toSeq
+    }
+  }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
index 21d3e5e..08873a3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
@@ -22,23 +22,27 @@ import org.apache.hadoop.fs.FileStatus
 
 import org.apache.spark.sql.{AnalysisException, SparkSession}
 import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsBatchRead, SupportsBatchWrite, Table}
+import org.apache.spark.sql.sources.v2.{SupportsBatchRead, SupportsBatchWrite, Table}
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 abstract class FileTable(
     sparkSession: SparkSession,
-    options: DataSourceOptions,
+    options: CaseInsensitiveStringMap,
+    paths: Seq[String],
     userSpecifiedSchema: Option[StructType])
   extends Table with SupportsBatchRead with SupportsBatchWrite {
+
   lazy val fileIndex: PartitioningAwareFileIndex = {
-    val filePaths = options.paths()
-    val hadoopConf =
-      sparkSession.sessionState.newHadoopConfWithOptions(options.asMap().asScala.toMap)
-    val rootPathsSpecified = DataSource.checkAndGlobPathIfNecessary(filePaths, hadoopConf,
-      checkEmptyGlobPath = true, checkFilesExist = options.checkFilesExist())
+    val scalaMap = options.asScala.toMap
+    val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(scalaMap)
+    // This is an internal config so must be present.
+    val checkFilesExist = options.get("check_files_exist").toBoolean
+    val rootPathsSpecified = DataSource.checkAndGlobPathIfNecessary(paths, hadoopConf,
+      checkEmptyGlobPath = true, checkFilesExist = checkFilesExist)
     val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
-    new InMemoryFileIndex(sparkSession, rootPathsSpecified,
-      options.asMap().asScala.toMap, userSpecifiedSchema, fileStatusCache)
+    new InMemoryFileIndex(
+      sparkSession, rootPathsSpecified, scalaMap, userSpecifiedSchema, fileStatusCache)
   }
 
   lazy val dataSchema: StructType = userSpecifiedSchema.orElse {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala
index 75c9224..e16ee4c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala
@@ -33,12 +33,12 @@ import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
 import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, DataSource, OutputWriterFactory, WriteJobDescription}
 import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.sources.v2.DataSourceOptions
 import org.apache.spark.sql.sources.v2.writer.{BatchWrite, SupportsSaveMode, WriteBuilder}
 import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.util.SerializableConfiguration
 
-abstract class FileWriteBuilder(options: DataSourceOptions)
+abstract class FileWriteBuilder(options: CaseInsensitiveStringMap, paths: Seq[String])
   extends WriteBuilder with SupportsSaveMode {
   private var schema: StructType = _
   private var queryId: String = _
@@ -61,18 +61,17 @@ abstract class FileWriteBuilder(options: DataSourceOptions)
 
   override def buildForBatch(): BatchWrite = {
     validateInputs()
-    val pathName = options.paths().head
-    val path = new Path(pathName)
+    val path = new Path(paths.head)
     val sparkSession = SparkSession.active
-    val optionsAsScala = options.asMap().asScala.toMap
+    val optionsAsScala = options.asScala.toMap
     val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(optionsAsScala)
     val job = getJobInstance(hadoopConf, path)
     val committer = FileCommitProtocol.instantiate(
       sparkSession.sessionState.conf.fileCommitProtocolClass,
       jobId = java.util.UUID.randomUUID().toString,
-      outputPath = pathName)
+      outputPath = paths.head)
     lazy val description =
-      createWriteJobDescription(sparkSession, hadoopConf, job, pathName, optionsAsScala)
+      createWriteJobDescription(sparkSession, hadoopConf, job, paths.head, optionsAsScala)
 
     val fs = path.getFileSystem(hadoopConf)
     mode match {
@@ -127,7 +126,7 @@ abstract class FileWriteBuilder(options: DataSourceOptions)
     assert(schema != null, "Missing input data schema")
     assert(queryId != null, "Missing query ID")
     assert(mode != null, "Missing save mode")
-    assert(options.paths().length == 1)
+    assert(paths.length == 1)
     DataSource.validateSchema(schema)
     schema.foreach { field =>
       if (!supportsDataType(field.dataType)) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
index d7cb245..51606ab 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
@@ -31,8 +31,9 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
 import org.apache.spark.sql.sources.{AlwaysTrue, Filter}
-import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsBatchWrite}
+import org.apache.spark.sql.sources.v2.SupportsBatchWrite
 import org.apache.spark.sql.sources.v2.writer.{BatchWrite, DataWriterFactory, SupportsDynamicOverwrite, SupportsOverwrite, SupportsSaveMode, SupportsTruncate, WriteBuilder, WriterCommitMessage}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.util.{LongAccumulator, Utils}
 
 /**
@@ -53,7 +54,7 @@ case class WriteToDataSourceV2(batchWrite: BatchWrite, query: LogicalPlan)
  */
 case class AppendDataExec(
     table: SupportsBatchWrite,
-    writeOptions: DataSourceOptions,
+    writeOptions: CaseInsensitiveStringMap,
     query: SparkPlan) extends V2TableWriteExec with BatchWriteHelper {
 
   override protected def doExecute(): RDD[InternalRow] = {
@@ -81,7 +82,7 @@ case class AppendDataExec(
 case class OverwriteByExpressionExec(
     table: SupportsBatchWrite,
     deleteWhere: Array[Filter],
-    writeOptions: DataSourceOptions,
+    writeOptions: CaseInsensitiveStringMap,
     query: SparkPlan) extends V2TableWriteExec with BatchWriteHelper {
 
   private def isTruncate(filters: Array[Filter]): Boolean = {
@@ -118,7 +119,7 @@ case class OverwriteByExpressionExec(
  */
 case class OverwritePartitionsDynamicExec(
     table: SupportsBatchWrite,
-    writeOptions: DataSourceOptions,
+    writeOptions: CaseInsensitiveStringMap,
     query: SparkPlan) extends V2TableWriteExec with BatchWriteHelper {
 
   override protected def doExecute(): RDD[InternalRow] = {
@@ -139,12 +140,9 @@ case class OverwritePartitionsDynamicExec(
 
 case class WriteToDataSourceV2Exec(
     batchWrite: BatchWrite,
-    query: SparkPlan
-  ) extends V2TableWriteExec {
+    query: SparkPlan) extends V2TableWriteExec {
 
-  import DataSourceV2Implicits._
-
-  def writeOptions: DataSourceOptions = Map.empty[String, String].toDataSourceOptions
+  def writeOptions: CaseInsensitiveStringMap = CaseInsensitiveStringMap.empty()
 
   override protected def doExecute(): RDD[InternalRow] = {
     doWrite(batchWrite)
@@ -157,7 +155,7 @@ case class WriteToDataSourceV2Exec(
 trait BatchWriteHelper {
   def table: SupportsBatchWrite
   def query: SparkPlan
-  def writeOptions: DataSourceOptions
+  def writeOptions: CaseInsensitiveStringMap
 
   def newWriteBuilder(): WriteBuilder = {
     table.newWriteBuilder(writeOptions)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala
index f279af4..900c94e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala
@@ -19,8 +19,9 @@ package org.apache.spark.sql.execution.datasources.v2.orc
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
 import org.apache.spark.sql.execution.datasources.v2._
-import org.apache.spark.sql.sources.v2.{DataSourceOptions, Table}
+import org.apache.spark.sql.sources.v2.Table
 import org.apache.spark.sql.types._
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 class OrcDataSourceV2 extends FileDataSourceV2 {
 
@@ -28,18 +29,20 @@ class OrcDataSourceV2 extends FileDataSourceV2 {
 
   override def shortName(): String = "orc"
 
-  private def getTableName(options: DataSourceOptions): String = {
-    shortName() + ":" + options.paths().mkString(";")
+  private def getTableName(paths: Seq[String]): String = {
+    shortName() + ":" + paths.mkString(";")
   }
 
-  override def getTable(options: DataSourceOptions): Table = {
-    val tableName = getTableName(options)
-    OrcTable(tableName, sparkSession, options, None)
+  override def getTable(options: CaseInsensitiveStringMap): Table = {
+    val paths = getPaths(options)
+    val tableName = getTableName(paths)
+    OrcTable(tableName, sparkSession, options, paths, None)
   }
 
-  override def getTable(options: DataSourceOptions, schema: StructType): Table = {
-    val tableName = getTableName(options)
-    OrcTable(tableName, sparkSession, options, Some(schema))
+  override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = {
+    val paths = getPaths(options)
+    val tableName = getTableName(paths)
+    OrcTable(tableName, sparkSession, options, paths, Some(schema))
   }
 }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala
index eb27bbd..0b15341 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala
@@ -26,18 +26,17 @@ import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
 import org.apache.spark.sql.execution.datasources.orc.OrcFilters
 import org.apache.spark.sql.execution.datasources.v2.FileScanBuilder
 import org.apache.spark.sql.sources.Filter
-import org.apache.spark.sql.sources.v2.DataSourceOptions
 import org.apache.spark.sql.sources.v2.reader.Scan
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 case class OrcScanBuilder(
     sparkSession: SparkSession,
     fileIndex: PartitioningAwareFileIndex,
     schema: StructType,
     dataSchema: StructType,
-    options: DataSourceOptions) extends FileScanBuilder(schema) {
-  lazy val hadoopConf =
-    sparkSession.sessionState.newHadoopConfWithOptions(options.asMap().asScala.toMap)
+    options: CaseInsensitiveStringMap) extends FileScanBuilder(schema) {
+  lazy val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(options.asScala.toMap)
 
   override def build(): Scan = {
     OrcScan(sparkSession, hadoopConf, fileIndex, dataSchema, readSchema)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala
index 249df8b..aac38fb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala
@@ -21,22 +21,24 @@ import org.apache.hadoop.fs.FileStatus
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.execution.datasources.orc.OrcUtils
 import org.apache.spark.sql.execution.datasources.v2.FileTable
-import org.apache.spark.sql.sources.v2.DataSourceOptions
 import org.apache.spark.sql.sources.v2.writer.WriteBuilder
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 case class OrcTable(
     name: String,
     sparkSession: SparkSession,
-    options: DataSourceOptions,
+    options: CaseInsensitiveStringMap,
+    paths: Seq[String],
     userSpecifiedSchema: Option[StructType])
-  extends FileTable(sparkSession, options, userSpecifiedSchema) {
-  override def newScanBuilder(options: DataSourceOptions): OrcScanBuilder =
+  extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
+
+  override def newScanBuilder(options: CaseInsensitiveStringMap): OrcScanBuilder =
     new OrcScanBuilder(sparkSession, fileIndex, schema, dataSchema, options)
 
   override def inferSchema(files: Seq[FileStatus]): Option[StructType] =
     OrcUtils.readSchema(sparkSession, files)
 
-  override def newWriteBuilder(options: DataSourceOptions): WriteBuilder =
-    new OrcWriteBuilder(options)
+  override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder =
+    new OrcWriteBuilder(options, paths)
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWriteBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWriteBuilder.scala
index 1aec4d8..829ab5f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWriteBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWriteBuilder.scala
@@ -25,10 +25,12 @@ import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFac
 import org.apache.spark.sql.execution.datasources.orc.{OrcFileFormat, OrcOptions, OrcOutputWriter, OrcUtils}
 import org.apache.spark.sql.execution.datasources.v2.FileWriteBuilder
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.sources.v2.DataSourceOptions
 import org.apache.spark.sql.types._
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+class OrcWriteBuilder(options: CaseInsensitiveStringMap, paths: Seq[String])
+  extends FileWriteBuilder(options, paths) {
 
-class OrcWriteBuilder(options: DataSourceOptions) extends FileWriteBuilder(options) {
   override def prepareWrite(
       sqlConf: SQLConf,
       job: Job,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index bedcb9f..fdd80cc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -95,9 +95,8 @@ class MicroBatchExecution(
           val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
           nextSourceId += 1
           logInfo(s"Reading table [$table] from DataSourceV2 named '$dsName' [$ds]")
-          val dsOptions = new DataSourceOptions(options.asJava)
           // TODO: operator pushdown.
-          val scan = table.newScanBuilder(dsOptions).build()
+          val scan = table.newScanBuilder(options).build()
           val stream = scan.toMicroBatchStream(metadataPath)
           StreamingDataSourceV2Relation(output, scan, stream)
         })
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 180a23c..cc44193 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -40,10 +40,11 @@ import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.execution.command.StreamingExplainCommand
 import org.apache.spark.sql.execution.datasources.v2.StreamWriterCommitProgress
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsStreamingWrite}
+import org.apache.spark.sql.sources.v2.SupportsStreamingWrite
 import org.apache.spark.sql.sources.v2.writer.SupportsTruncate
 import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite
 import org.apache.spark.sql.streaming._
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.util.{Clock, UninterruptibleThread, Utils}
 
 /** States for [[StreamExecution]]'s lifecycle. */
@@ -584,7 +585,7 @@ abstract class StreamExecution(
       table: SupportsStreamingWrite,
       options: Map[String, String],
       inputPlan: LogicalPlan): StreamingWrite = {
-    val writeBuilder = table.newWriteBuilder(new DataSourceOptions(options.asJava))
+    val writeBuilder = table.newWriteBuilder(new CaseInsensitiveStringMap(options.asJava))
       .withQueryId(id.toString)
       .withInputDataSchema(inputPlan.schema)
     outputMode match {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
index 1b7aa54..0d7e9ba 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Stati
 import org.apache.spark.sql.execution.LeafExecNode
 import org.apache.spark.sql.execution.datasources.DataSource
 import org.apache.spark.sql.sources.v2.{Table, TableProvider}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 object StreamingRelation {
   def apply(dataSource: DataSource): StreamingRelation = {
@@ -95,7 +96,7 @@ case class StreamingRelationV2(
     source: TableProvider,
     sourceName: String,
     table: Table,
-    extraOptions: Map[String, String],
+    extraOptions: CaseInsensitiveStringMap,
     output: Seq[Attribute],
     v1Relation: Option[StreamingRelation])(session: SparkSession)
   extends LeafNode with MultiInstanceRelation {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
index 923bd74..dbdfcf8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.sources.v2._
 import org.apache.spark.sql.sources.v2.writer.{SupportsTruncate, WriteBuilder}
 import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 case class ConsoleRelation(override val sqlContext: SQLContext, data: DataFrame)
   extends BaseRelation {
@@ -34,7 +35,7 @@ class ConsoleSinkProvider extends TableProvider
   with DataSourceRegister
   with CreatableRelationProvider {
 
-  override def getTable(options: DataSourceOptions): Table = {
+  override def getTable(options: CaseInsensitiveStringMap): Table = {
     ConsoleTable
   }
 
@@ -62,7 +63,7 @@ object ConsoleTable extends Table with SupportsStreamingWrite {
 
   override def schema(): StructType = StructType(Nil)
 
-  override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = {
+  override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = {
     new WriteBuilder with SupportsTruncate {
       private var inputSchema: StructType = _
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
index f55a45d..c8fb53d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
@@ -22,7 +22,6 @@ import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.AtomicReference
 import java.util.function.UnaryOperator
 
-import scala.collection.JavaConverters._
 import scala.collection.mutable.{Map => MutableMap}
 
 import org.apache.spark.SparkEnv
@@ -33,7 +32,7 @@ import org.apache.spark.sql.execution.SQLExecution
 import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
 import org.apache.spark.sql.execution.streaming.{StreamingRelationV2, _}
 import org.apache.spark.sql.sources.v2
-import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsContinuousRead, SupportsStreamingWrite}
+import org.apache.spark.sql.sources.v2.{SupportsContinuousRead, SupportsStreamingWrite}
 import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, PartitionOffset}
 import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
 import org.apache.spark.util.Clock
@@ -71,9 +70,8 @@ class ContinuousExecution(
           val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
           nextSourceId += 1
           logInfo(s"Reading table [$table] from DataSourceV2 named '$dsName' [$ds]")
-          val dsOptions = new DataSourceOptions(options.asJava)
           // TODO: operator pushdown.
-          val scan = table.newScanBuilder(dsOptions).build()
+          val scan = table.newScanBuilder(options).build()
           val stream = scan.toContinuousStream(metadataPath)
           StreamingDataSourceV2Relation(output, scan, stream)
         })
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
index 48ff70f..d55f71c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
@@ -23,17 +23,13 @@ import org.json4s.jackson.Serialization
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.streaming.{RateStreamOffset, ValueRunTimeMsPair}
-import org.apache.spark.sql.sources.v2.DataSourceOptions
 import org.apache.spark.sql.sources.v2.reader._
 import org.apache.spark.sql.sources.v2.reader.streaming._
 
 case class RateStreamPartitionOffset(
    partition: Int, currentValue: Long, currentTimeMs: Long) extends PartitionOffset
 
-class RateStreamContinuousStream(
-    rowsPerSecond: Long,
-    numPartitions: Int,
-    options: DataSourceOptions) extends ContinuousStream {
+class RateStreamContinuousStream(rowsPerSecond: Long, numPartitions: Int) extends ContinuousStream {
   implicit val defaultFormats: DefaultFormats = DefaultFormats
 
   val creationTime = System.currentTimeMillis()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala
index e7bc713..2263b42 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala
@@ -34,9 +34,9 @@ import org.apache.spark.rpc.RpcEndpointRef
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.streaming.{Offset => _, _}
 import org.apache.spark.sql.execution.streaming.sources.TextSocketReader
-import org.apache.spark.sql.sources.v2.DataSourceOptions
 import org.apache.spark.sql.sources.v2.reader._
 import org.apache.spark.sql.sources.v2.reader.streaming._
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.util.RpcUtils
 
 
@@ -49,7 +49,7 @@ import org.apache.spark.util.RpcUtils
  * buckets and serves the messages to the executors via a RPC endpoint.
  */
 class TextSocketContinuousStream(
-    host: String, port: Int, numPartitions: Int, options: DataSourceOptions)
+    host: String, port: Int, numPartitions: Int, options: CaseInsensitiveStringMap)
   extends ContinuousStream with Logging {
 
   implicit val defaultFormats: DefaultFormats = DefaultFormats
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
index e71f81c..df7990c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -38,6 +38,7 @@ import org.apache.spark.sql.sources.v2.reader._
 import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, MicroBatchStream, Offset => OffsetV2}
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 object MemoryStream {
   protected val currentBlockId = new AtomicInteger(0)
@@ -73,7 +74,7 @@ abstract class MemoryStreamBase[A : Encoder](sqlContext: SQLContext) extends Bas
       MemoryStreamTableProvider,
       "memory",
       new MemoryStreamTable(this),
-      Map.empty,
+      CaseInsensitiveStringMap.empty(),
       attributes,
       None)(sqlContext.sparkSession)
   }
@@ -84,7 +85,7 @@ abstract class MemoryStreamBase[A : Encoder](sqlContext: SQLContext) extends Bas
 // This class is used to indicate the memory stream data source. We don't actually use it, as
 // memory stream is for test only and we never look it up by name.
 object MemoryStreamTableProvider extends TableProvider {
-  override def getTable(options: DataSourceOptions): Table = {
+  override def getTable(options: CaseInsensitiveStringMap): Table = {
     throw new IllegalStateException("MemoryStreamTableProvider should not be used.")
   }
 }
@@ -96,7 +97,7 @@ class MemoryStreamTable(val stream: MemoryStreamBase[_]) extends Table
 
   override def schema(): StructType = stream.fullSchema()
 
-  override def newScanBuilder(options: DataSourceOptions): ScanBuilder = {
+  override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
     new MemoryStreamScanBuilder(stream)
   }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWrite.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWrite.scala
index f2ff30b..dbe2427 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWrite.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWrite.scala
@@ -20,13 +20,13 @@ package org.apache.spark.sql.execution.streaming.sources
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{Dataset, SparkSession}
 import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
-import org.apache.spark.sql.sources.v2.DataSourceOptions
 import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage
 import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite}
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 /** Common methods used to create writes for the the console sink */
-class ConsoleWrite(schema: StructType, options: DataSourceOptions)
+class ConsoleWrite(schema: StructType, options: CaseInsensitiveStringMap)
     extends StreamingWrite with Logging {
 
   // Number of rows to display, by default 20 rows
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala
index c0ae44a..44516bb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala
@@ -22,10 +22,11 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.execution.python.PythonForeachWriter
-import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsStreamingWrite, Table}
+import org.apache.spark.sql.sources.v2.{SupportsStreamingWrite, Table}
 import org.apache.spark.sql.sources.v2.writer.{DataWriter, SupportsTruncate, WriteBuilder, WriterCommitMessage}
 import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite}
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 /**
  * A write-only table for forwarding data into the specified [[ForeachWriter]].
@@ -44,7 +45,7 @@ case class ForeachWriterTable[T](
 
   override def schema(): StructType = StructType(Nil)
 
-  override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = {
+  override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = {
     new WriteBuilder with SupportsTruncate {
       private var inputSchema: StructType = _
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchStream.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchStream.scala
index a8feed3..5403eaf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchStream.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchStream.scala
@@ -28,9 +28,9 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.streaming._
-import org.apache.spark.sql.sources.v2.DataSourceOptions
 import org.apache.spark.sql.sources.v2.reader._
 import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchStream, Offset}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.util.{ManualClock, SystemClock}
 
 class RateStreamMicroBatchStream(
@@ -38,7 +38,7 @@ class RateStreamMicroBatchStream(
     // The default values here are used in tests.
     rampUpTimeSeconds: Long = 0,
     numPartitions: Int = 1,
-    options: DataSourceOptions,
+    options: CaseInsensitiveStringMap,
     checkpointLocation: String)
   extends MicroBatchStream with Logging {
   import RateStreamProvider._
@@ -155,7 +155,7 @@ class RateStreamMicroBatchStream(
 
   override def toString: String = s"RateStreamV2[rowsPerSecond=$rowsPerSecond, " +
     s"rampUpTimeSeconds=$rampUpTimeSeconds, " +
-    s"numPartitions=${options.get(NUM_PARTITIONS).orElse("default")}"
+    s"numPartitions=${options.getOrDefault(NUM_PARTITIONS, "default")}"
 }
 
 case class RateStreamMicroBatchInputPartition(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala
index 3a00825..3d8a90e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.sources.v2._
 import org.apache.spark.sql.sources.v2.reader.{Scan, ScanBuilder}
 import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, MicroBatchStream}
 import org.apache.spark.sql.types._
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 /**
  *  A source that generates increment long values with timestamps. Each generated row has two
@@ -43,14 +44,14 @@ import org.apache.spark.sql.types._
 class RateStreamProvider extends TableProvider with DataSourceRegister {
   import RateStreamProvider._
 
-  override def getTable(options: DataSourceOptions): Table = {
+  override def getTable(options: CaseInsensitiveStringMap): Table = {
     val rowsPerSecond = options.getLong(ROWS_PER_SECOND, 1)
     if (rowsPerSecond <= 0) {
       throw new IllegalArgumentException(
         s"Invalid value '$rowsPerSecond'. The option 'rowsPerSecond' must be positive")
     }
 
-    val rampUpTimeSeconds = Option(options.get(RAMP_UP_TIME).orElse(null))
+    val rampUpTimeSeconds = Option(options.get(RAMP_UP_TIME))
       .map(JavaUtils.timeStringAsSec)
       .getOrElse(0L)
     if (rampUpTimeSeconds < 0) {
@@ -83,7 +84,7 @@ class RateStreamTable(
 
   override def schema(): StructType = RateStreamProvider.SCHEMA
 
-  override def newScanBuilder(options: DataSourceOptions): ScanBuilder = new ScanBuilder {
+  override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = new ScanBuilder {
     override def build(): Scan = new Scan {
       override def readSchema(): StructType = RateStreamProvider.SCHEMA
 
@@ -93,7 +94,7 @@ class RateStreamTable(
       }
 
       override def toContinuousStream(checkpointLocation: String): ContinuousStream = {
-        new RateStreamContinuousStream(rowsPerSecond, numPartitions, options)
+        new RateStreamContinuousStream(rowsPerSecond, numPartitions)
       }
     }
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketMicroBatchStream.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketMicroBatchStream.scala
index 540131c..9168d46 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketMicroBatchStream.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketMicroBatchStream.scala
@@ -29,7 +29,6 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.streaming.LongOffset
-import org.apache.spark.sql.sources.v2.DataSourceOptions
 import org.apache.spark.sql.sources.v2.reader.{InputPartition, PartitionReader, PartitionReaderFactory}
 import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchStream, Offset}
 import org.apache.spark.unsafe.types.UTF8String
@@ -39,8 +38,7 @@ import org.apache.spark.unsafe.types.UTF8String
  * and debugging. This MicroBatchReadSupport will *not* work in production applications due to
  * multiple reasons, including no support for fault recovery.
  */
-class TextSocketMicroBatchStream(
-    host: String, port: Int, numPartitions: Int, options: DataSourceOptions)
+class TextSocketMicroBatchStream(host: String, port: Int, numPartitions: Int)
   extends MicroBatchStream with Logging {
 
   @GuardedBy("this")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala
index 8ac5bfc..0adbf1d9b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala
@@ -30,20 +30,21 @@ import org.apache.spark.sql.sources.v2._
 import org.apache.spark.sql.sources.v2.reader.{Scan, ScanBuilder}
 import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, MicroBatchStream}
 import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 class TextSocketSourceProvider extends TableProvider with DataSourceRegister with Logging {
 
-  private def checkParameters(params: DataSourceOptions): Unit = {
+  private def checkParameters(params: CaseInsensitiveStringMap): Unit = {
     logWarning("The socket source should not be used for production applications! " +
       "It does not support recovery.")
-    if (!params.get("host").isPresent) {
+    if (!params.containsKey("host")) {
       throw new AnalysisException("Set a host to read from with option(\"host\", ...).")
     }
-    if (!params.get("port").isPresent) {
+    if (!params.containsKey("port")) {
       throw new AnalysisException("Set a port to read from with option(\"port\", ...).")
     }
     Try {
-      params.get("includeTimestamp").orElse("false").toBoolean
+      params.getBoolean("includeTimestamp", false)
     } match {
       case Success(_) =>
       case Failure(_) =>
@@ -51,10 +52,10 @@ class TextSocketSourceProvider extends TableProvider with DataSourceRegister wit
     }
   }
 
-  override def getTable(options: DataSourceOptions): Table = {
+  override def getTable(options: CaseInsensitiveStringMap): Table = {
     checkParameters(options)
     new TextSocketTable(
-      options.get("host").get,
+      options.get("host"),
       options.getInt("port", -1),
       options.getInt("numPartitions", SparkSession.active.sparkContext.defaultParallelism),
       options.getBoolean("includeTimestamp", false))
@@ -77,12 +78,12 @@ class TextSocketTable(host: String, port: Int, numPartitions: Int, includeTimest
     }
   }
 
-  override def newScanBuilder(options: DataSourceOptions): ScanBuilder = new ScanBuilder {
+  override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = new ScanBuilder {
     override def build(): Scan = new Scan {
       override def readSchema(): StructType = schema()
 
       override def toMicroBatchStream(checkpointLocation: String): MicroBatchStream = {
-        new TextSocketMicroBatchStream(host, port, numPartitions, options)
+        new TextSocketMicroBatchStream(host, port, numPartitions)
       }
 
       override def toContinuousStream(checkpointLocation: String): ContinuousStream = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
index 397c5ff..22adceb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
@@ -31,10 +31,11 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
 import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
 import org.apache.spark.sql.execution.streaming.{MemorySinkBase, Sink}
-import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsStreamingWrite}
+import org.apache.spark.sql.sources.v2.SupportsStreamingWrite
 import org.apache.spark.sql.sources.v2.writer._
 import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite}
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 /**
  * A sink that stores the results in memory. This [[Sink]] is primarily intended for use in unit
@@ -46,7 +47,7 @@ class MemorySinkV2 extends SupportsStreamingWrite with MemorySinkBase with Loggi
 
   override def schema(): StructType = StructType(Nil)
 
-  override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = {
+  override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = {
     new WriteBuilder with SupportsTruncate {
       private var needTruncate: Boolean = false
       private var inputSchema: StructType = _
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index 96b3a86..01f29cd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.execution.streaming.{StreamingRelation, StreamingRel
 import org.apache.spark.sql.sources.StreamSourceProvider
 import org.apache.spark.sql.sources.v2._
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 /**
  * Interface used to load a streaming `Dataset` from external storage systems (e.g. file systems,
@@ -175,7 +176,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
         val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
           source = provider, conf = sparkSession.sessionState.conf)
         val options = sessionOptions ++ extraOptions
-        val dsOptions = new DataSourceOptions(options.asJava)
+        val dsOptions = new CaseInsensitiveStringMap(options.asJava)
         val table = userSpecifiedSchema match {
           case Some(schema) => provider.getTable(dsOptions, schema)
           case _ => provider.getTable(dsOptions)
@@ -185,7 +186,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
             Dataset.ofRows(
               sparkSession,
               StreamingRelationV2(
-                provider, source, table, options, table.schema.toAttributes, v1Relation)(
+                provider, source, table, dsOptions, table.schema.toAttributes, v1Relation)(
                 sparkSession))
 
           // fallback to v1
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index 9841994..33d032e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -31,7 +31,8 @@ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger
 import org.apache.spark.sql.execution.streaming.sources._
-import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsStreamingWrite, TableProvider}
+import org.apache.spark.sql.sources.v2.{SupportsStreamingWrite, TableProvider}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 /**
  * Interface used to write a streaming `Dataset` to external storage systems (e.g. file systems,
@@ -313,7 +314,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
         val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
           source = provider, conf = df.sparkSession.sessionState.conf)
         val options = sessionOptions ++ extraOptions
-        val dsOptions = new DataSourceOptions(options.asJava)
+        val dsOptions = new CaseInsensitiveStringMap(options.asJava)
         provider.getTable(dsOptions) match {
           case s: SupportsStreamingWrite => s
           case _ => createV1Sink()
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java
index 2612b61..255a9f8 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java
@@ -24,19 +24,19 @@ import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
 import org.apache.spark.sql.sources.Filter;
 import org.apache.spark.sql.sources.GreaterThan;
-import org.apache.spark.sql.sources.v2.DataSourceOptions;
 import org.apache.spark.sql.sources.v2.Table;
 import org.apache.spark.sql.sources.v2.TableProvider;
 import org.apache.spark.sql.sources.v2.reader.*;
 import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 
 public class JavaAdvancedDataSourceV2 implements TableProvider {
 
   @Override
-  public Table getTable(DataSourceOptions options) {
+  public Table getTable(CaseInsensitiveStringMap options) {
     return new JavaSimpleBatchTable() {
       @Override
-      public ScanBuilder newScanBuilder(DataSourceOptions options) {
+      public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
         return new AdvancedScanBuilder();
       }
     };
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaColumnarDataSourceV2.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaColumnarDataSourceV2.java
index d72ab53..699859c 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaColumnarDataSourceV2.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaColumnarDataSourceV2.java
@@ -21,11 +21,11 @@ import java.io.IOException;
 
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
-import org.apache.spark.sql.sources.v2.DataSourceOptions;
 import org.apache.spark.sql.sources.v2.Table;
 import org.apache.spark.sql.sources.v2.TableProvider;
 import org.apache.spark.sql.sources.v2.reader.*;
 import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 import org.apache.spark.sql.vectorized.ColumnVector;
 import org.apache.spark.sql.vectorized.ColumnarBatch;
 
@@ -49,10 +49,10 @@ public class JavaColumnarDataSourceV2 implements TableProvider {
   }
 
   @Override
-  public Table getTable(DataSourceOptions options) {
+  public Table getTable(CaseInsensitiveStringMap options) {
     return new JavaSimpleBatchTable() {
       @Override
-      public ScanBuilder newScanBuilder(DataSourceOptions options) {
+      public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
         return new MyScanBuilder();
       }
     };
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java
index a513bfb..dfbea92 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java
@@ -22,13 +22,13 @@ import java.util.Arrays;
 
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
-import org.apache.spark.sql.sources.v2.DataSourceOptions;
 import org.apache.spark.sql.sources.v2.Table;
 import org.apache.spark.sql.sources.v2.TableProvider;
 import org.apache.spark.sql.sources.v2.reader.*;
 import org.apache.spark.sql.sources.v2.reader.partitioning.ClusteredDistribution;
 import org.apache.spark.sql.sources.v2.reader.partitioning.Distribution;
 import org.apache.spark.sql.sources.v2.reader.partitioning.Partitioning;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 
 public class JavaPartitionAwareDataSource implements TableProvider {
 
@@ -54,10 +54,10 @@ public class JavaPartitionAwareDataSource implements TableProvider {
   }
 
   @Override
-  public Table getTable(DataSourceOptions options) {
+  public Table getTable(CaseInsensitiveStringMap options) {
     return new JavaSimpleBatchTable() {
       @Override
-      public ScanBuilder newScanBuilder(DataSourceOptions options) {
+      public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
         return new MyScanBuilder();
       }
     };
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaReportStatisticsDataSource.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaReportStatisticsDataSource.java
index bbc8492..f3755e1 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaReportStatisticsDataSource.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaReportStatisticsDataSource.java
@@ -19,13 +19,13 @@ package test.org.apache.spark.sql.sources.v2;
 
 import java.util.OptionalLong;
 
-import org.apache.spark.sql.sources.v2.DataSourceOptions;
 import org.apache.spark.sql.sources.v2.Table;
 import org.apache.spark.sql.sources.v2.TableProvider;
 import org.apache.spark.sql.sources.v2.reader.InputPartition;
 import org.apache.spark.sql.sources.v2.reader.ScanBuilder;
 import org.apache.spark.sql.sources.v2.reader.Statistics;
 import org.apache.spark.sql.sources.v2.reader.SupportsReportStatistics;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 
 public class JavaReportStatisticsDataSource implements TableProvider {
   class MyScanBuilder extends JavaSimpleScanBuilder implements SupportsReportStatistics {
@@ -54,10 +54,10 @@ public class JavaReportStatisticsDataSource implements TableProvider {
   }
 
   @Override
-  public Table getTable(DataSourceOptions options) {
+  public Table getTable(CaseInsensitiveStringMap options) {
     return new JavaSimpleBatchTable() {
       @Override
-      public ScanBuilder newScanBuilder(DataSourceOptions options) {
+      public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
         return new MyScanBuilder();
       }
     };
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java
index 815d57ba..3800a94 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java
@@ -17,11 +17,11 @@
 
 package test.org.apache.spark.sql.sources.v2;
 
-import org.apache.spark.sql.sources.v2.DataSourceOptions;
 import org.apache.spark.sql.sources.v2.Table;
 import org.apache.spark.sql.sources.v2.TableProvider;
 import org.apache.spark.sql.sources.v2.reader.*;
 import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 
 public class JavaSchemaRequiredDataSource implements TableProvider {
 
@@ -45,7 +45,7 @@ public class JavaSchemaRequiredDataSource implements TableProvider {
   }
 
   @Override
-  public Table getTable(DataSourceOptions options, StructType schema) {
+  public Table getTable(CaseInsensitiveStringMap options, StructType schema) {
     return new JavaSimpleBatchTable() {
 
       @Override
@@ -54,14 +54,14 @@ public class JavaSchemaRequiredDataSource implements TableProvider {
       }
 
       @Override
-      public ScanBuilder newScanBuilder(DataSourceOptions options) {
+      public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
         return new MyScanBuilder(schema);
       }
     };
   }
 
   @Override
-  public Table getTable(DataSourceOptions options) {
+  public Table getTable(CaseInsensitiveStringMap options) {
     throw new IllegalArgumentException("requires a user-supplied schema");
   }
 }
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java
index 852c454..7474f36 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java
@@ -17,10 +17,10 @@
 
 package test.org.apache.spark.sql.sources.v2;
 
-import org.apache.spark.sql.sources.v2.DataSourceOptions;
 import org.apache.spark.sql.sources.v2.Table;
 import org.apache.spark.sql.sources.v2.TableProvider;
 import org.apache.spark.sql.sources.v2.reader.*;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 
 public class JavaSimpleDataSourceV2 implements TableProvider {
 
@@ -36,10 +36,10 @@ public class JavaSimpleDataSourceV2 implements TableProvider {
   }
 
   @Override
-  public Table getTable(DataSourceOptions options) {
+  public Table getTable(CaseInsensitiveStringMap options) {
     return new JavaSimpleBatchTable() {
       @Override
-      public ScanBuilder newScanBuilder(DataSourceOptions options) {
+      public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
         return new MyScanBuilder();
       }
     };
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
index cccd8e9..034454d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
@@ -32,7 +32,6 @@ import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, HadoopFsR
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.execution.datasources.v2.orc.OrcTable
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.sources.v2.DataSourceOptions
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
 
@@ -58,7 +57,7 @@ class OrcFilterSuite extends OrcTest with SharedSQLContext {
       case PhysicalOperation(_, filters,
         DataSourceV2Relation(orcTable: OrcTable, _, options)) =>
         assert(filters.nonEmpty, "No filter is analyzed from the given query")
-        val scanBuilder = orcTable.newScanBuilder(new DataSourceOptions(options.asJava))
+        val scanBuilder = orcTable.newScanBuilder(options)
         scanBuilder.pushFilters(filters.flatMap(DataSourceStrategy.translateFilter).toArray)
         val pushedFilters = scanBuilder.pushedFilters()
         assert(pushedFilters.nonEmpty, "No filter is pushed down")
@@ -102,7 +101,7 @@ class OrcFilterSuite extends OrcTest with SharedSQLContext {
       case PhysicalOperation(_, filters,
       DataSourceV2Relation(orcTable: OrcTable, _, options)) =>
         assert(filters.nonEmpty, "No filter is analyzed from the given query")
-        val scanBuilder = orcTable.newScanBuilder(new DataSourceOptions(options.asJava))
+        val scanBuilder = orcTable.newScanBuilder(options)
         scanBuilder.pushFilters(filters.flatMap(DataSourceStrategy.translateFilter).toArray)
         val pushedFilters = scanBuilder.pushedFilters()
         if (noneSupported) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
index d0418f8..c04f6e3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
@@ -29,9 +29,9 @@ import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relati
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.execution.streaming.continuous._
 import org.apache.spark.sql.functions._
-import org.apache.spark.sql.sources.v2.DataSourceOptions
 import org.apache.spark.sql.sources.v2.reader.streaming.Offset
 import org.apache.spark.sql.streaming.StreamTest
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.util.ManualClock
 
 class RateStreamProviderSuite extends StreamTest {
@@ -135,7 +135,7 @@ class RateStreamProviderSuite extends StreamTest {
     withTempDir { temp =>
       val stream = new RateStreamMicroBatchStream(
         rowsPerSecond = 100,
-        options = new DataSourceOptions(Map("useManualClock" -> "true").asJava),
+        options = new CaseInsensitiveStringMap(Map("useManualClock" -> "true").asJava),
         checkpointLocation = temp.getCanonicalPath)
       stream.clock.asInstanceOf[ManualClock].advance(100000)
       val startOffset = stream.initialOffset()
@@ -154,7 +154,7 @@ class RateStreamProviderSuite extends StreamTest {
     withTempDir { temp =>
       val stream = new RateStreamMicroBatchStream(
         rowsPerSecond = 20,
-        options = DataSourceOptions.empty(),
+        options = CaseInsensitiveStringMap.empty(),
         checkpointLocation = temp.getCanonicalPath)
       val partitions = stream.planInputPartitions(LongOffset(0L), LongOffset(1L))
       val readerFactory = stream.createReaderFactory()
@@ -173,7 +173,7 @@ class RateStreamProviderSuite extends StreamTest {
       val stream = new RateStreamMicroBatchStream(
         rowsPerSecond = 33,
         numPartitions = 11,
-        options = DataSourceOptions.empty(),
+        options = CaseInsensitiveStringMap.empty(),
         checkpointLocation = temp.getCanonicalPath)
       val partitions = stream.planInputPartitions(LongOffset(0L), LongOffset(1L))
       val readerFactory = stream.createReaderFactory()
@@ -309,8 +309,7 @@ class RateStreamProviderSuite extends StreamTest {
   }
 
   test("continuous data") {
-    val stream = new RateStreamContinuousStream(
-      rowsPerSecond = 20, numPartitions = 2, options = DataSourceOptions.empty())
+    val stream = new RateStreamContinuousStream(rowsPerSecond = 20, numPartitions = 2)
     val partitions = stream.planInputPartitions(stream.initialOffset)
     val readerFactory = stream.createContinuousReaderFactory()
     assert(partitions.size == 2)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
index e1769fb..a5ba4f9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
@@ -35,11 +35,11 @@ import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relati
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.execution.streaming.continuous._
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.sources.v2.DataSourceOptions
 import org.apache.spark.sql.sources.v2.reader.streaming.Offset
 import org.apache.spark.sql.streaming.{StreamingQueryException, StreamTest}
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 class TextSocketStreamSuite extends StreamTest with SharedSQLContext with BeforeAndAfterEach {
 
@@ -176,13 +176,13 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before
   test("params not given") {
     val provider = new TextSocketSourceProvider
     intercept[AnalysisException] {
-      provider.getTable(new DataSourceOptions(Map.empty[String, String].asJava))
+      provider.getTable(CaseInsensitiveStringMap.empty())
     }
     intercept[AnalysisException] {
-      provider.getTable(new DataSourceOptions(Map("host" -> "localhost").asJava))
+      provider.getTable(new CaseInsensitiveStringMap(Map("host" -> "localhost").asJava))
     }
     intercept[AnalysisException] {
-      provider.getTable(new DataSourceOptions(Map("port" -> "1234").asJava))
+      provider.getTable(new CaseInsensitiveStringMap(Map("port" -> "1234").asJava))
     }
   }
 
@@ -190,7 +190,7 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before
     val provider = new TextSocketSourceProvider
     val params = Map("host" -> "localhost", "port" -> "1234", "includeTimestamp" -> "fasle")
     intercept[AnalysisException] {
-      provider.getTable(new DataSourceOptions(params.asJava))
+      provider.getTable(new CaseInsensitiveStringMap(params.asJava))
     }
   }
 
@@ -201,7 +201,7 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before
       StructField("area", StringType) :: Nil)
     val params = Map("host" -> "localhost", "port" -> "1234")
     val exception = intercept[UnsupportedOperationException] {
-      provider.getTable(new DataSourceOptions(params.asJava), userSpecifiedSchema)
+      provider.getTable(new CaseInsensitiveStringMap(params.asJava), userSpecifiedSchema)
     }
     assert(exception.getMessage.contains(
       "socket source does not support user-specified schema"))
@@ -299,7 +299,7 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before
       host = "localhost",
       port = serverThread.port,
       numPartitions = 2,
-      options = DataSourceOptions.empty())
+      options = CaseInsensitiveStringMap.empty())
     val partitions = stream.planInputPartitions(stream.initialOffset())
     assert(partitions.length == 2)
 
@@ -351,7 +351,7 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before
       host = "localhost",
       port = serverThread.port,
       numPartitions = 2,
-      options = DataSourceOptions.empty())
+      options = CaseInsensitiveStringMap.empty())
 
     stream.startOffset = TextSocketOffset(List(5, 5))
     assertThrows[IllegalStateException] {
@@ -367,7 +367,7 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before
       host = "localhost",
       port = serverThread.port,
       numPartitions = 2,
-      options = new DataSourceOptions(Map("includeTimestamp" -> "true").asJava))
+      options = new CaseInsensitiveStringMap(Map("includeTimestamp" -> "true").asJava))
     val partitions = stream.planInputPartitions(stream.initialOffset())
     assert(partitions.size == 2)
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
index e184bf5..705559d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
@@ -34,6 +34,7 @@ import org.apache.spark.sql.sources.v2.reader._
 import org.apache.spark.sql.sources.v2.reader.partitioning.{ClusteredDistribution, Distribution, Partitioning}
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types.{IntegerType, StructType}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.sql.vectorized.ColumnarBatch
 
 class DataSourceV2Suite extends QueryTest with SharedSQLContext {
@@ -349,7 +350,7 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext {
       val options = df.queryExecution.optimizedPlan.collectFirst {
         case d: DataSourceV2Relation => d.options
       }.get
-      assert(options(optionName) === "false")
+      assert(options.get(optionName) === "false")
     }
   }
 
@@ -437,8 +438,8 @@ class SimpleSinglePartitionSource extends TableProvider {
     }
   }
 
-  override def getTable(options: DataSourceOptions): Table = new SimpleBatchTable {
-    override def newScanBuilder(options: DataSourceOptions): ScanBuilder = {
+  override def getTable(options: CaseInsensitiveStringMap): Table = new SimpleBatchTable {
+    override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
       new MyScanBuilder()
     }
   }
@@ -454,8 +455,8 @@ class SimpleDataSourceV2 extends TableProvider {
     }
   }
 
-  override def getTable(options: DataSourceOptions): Table = new SimpleBatchTable {
-    override def newScanBuilder(options: DataSourceOptions): ScanBuilder = {
+  override def getTable(options: CaseInsensitiveStringMap): Table = new SimpleBatchTable {
+    override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
       new MyScanBuilder()
     }
   }
@@ -463,8 +464,8 @@ class SimpleDataSourceV2 extends TableProvider {
 
 class AdvancedDataSourceV2 extends TableProvider {
 
-  override def getTable(options: DataSourceOptions): Table = new SimpleBatchTable {
-    override def newScanBuilder(options: DataSourceOptions): ScanBuilder = {
+  override def getTable(options: CaseInsensitiveStringMap): Table = new SimpleBatchTable {
+    override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
       new AdvancedScanBuilder()
     }
   }
@@ -559,16 +560,16 @@ class SchemaRequiredDataSource extends TableProvider {
     override def readSchema(): StructType = schema
   }
 
-  override def getTable(options: DataSourceOptions): Table = {
+  override def getTable(options: CaseInsensitiveStringMap): Table = {
     throw new IllegalArgumentException("requires a user-supplied schema")
   }
 
-  override def getTable(options: DataSourceOptions, schema: StructType): Table = {
+  override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = {
     val userGivenSchema = schema
     new SimpleBatchTable {
       override def schema(): StructType = userGivenSchema
 
-      override def newScanBuilder(options: DataSourceOptions): ScanBuilder = {
+      override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
         new MyScanBuilder(userGivenSchema)
       }
     }
@@ -588,8 +589,8 @@ class ColumnarDataSourceV2 extends TableProvider {
     }
   }
 
-  override def getTable(options: DataSourceOptions): Table = new SimpleBatchTable {
-    override def newScanBuilder(options: DataSourceOptions): ScanBuilder = {
+  override def getTable(options: CaseInsensitiveStringMap): Table = new SimpleBatchTable {
+    override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
       new MyScanBuilder()
     }
   }
@@ -659,8 +660,8 @@ class PartitionAwareDataSource extends TableProvider {
     override def outputPartitioning(): Partitioning = new MyPartitioning
   }
 
-  override def getTable(options: DataSourceOptions): Table = new SimpleBatchTable {
-    override def newScanBuilder(options: DataSourceOptions): ScanBuilder = {
+  override def getTable(options: CaseInsensitiveStringMap): Table = new SimpleBatchTable {
+    override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
       new MyScanBuilder()
     }
   }
@@ -699,7 +700,7 @@ class SchemaReadAttemptException(m: String) extends RuntimeException(m)
 
 class SimpleWriteOnlyDataSource extends SimpleWritableDataSource {
 
-  override def getTable(options: DataSourceOptions): Table = {
+  override def getTable(options: CaseInsensitiveStringMap): Table = {
     new MyTable(options) {
       override def schema(): StructType = {
         throw new SchemaReadAttemptException("schema should not be read.")
@@ -725,9 +726,9 @@ class ReportStatisticsDataSource extends TableProvider {
     }
   }
 
-  override def getTable(options: DataSourceOptions): Table = {
+  override def getTable(options: CaseInsensitiveStringMap): Table = {
     new SimpleBatchTable {
-      override def newScanBuilder(options: DataSourceOptions): ScanBuilder = {
+      override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
         new MyScanBuilder
       }
     }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/FileDataSourceV2FallBackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/FileDataSourceV2FallBackSuite.scala
index fd19a48..f9f9db3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/FileDataSourceV2FallBackSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/FileDataSourceV2FallBackSuite.scala
@@ -18,13 +18,14 @@ package org.apache.spark.sql.sources.v2
 
 import org.apache.spark.sql.{AnalysisException, QueryTest}
 import org.apache.spark.sql.execution.datasources.FileFormat
-import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetTest}
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.v2.reader.ScanBuilder
 import org.apache.spark.sql.sources.v2.writer.WriteBuilder
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 class DummyReadOnlyFileDataSourceV2 extends FileDataSourceV2 {
 
@@ -32,7 +33,7 @@ class DummyReadOnlyFileDataSourceV2 extends FileDataSourceV2 {
 
   override def shortName(): String = "parquet"
 
-  override def getTable(options: DataSourceOptions): Table = {
+  override def getTable(options: CaseInsensitiveStringMap): Table = {
     new DummyReadOnlyFileTable
   }
 }
@@ -42,7 +43,7 @@ class DummyReadOnlyFileTable extends Table with SupportsBatchRead {
 
   override def schema(): StructType = StructType(Nil)
 
-  override def newScanBuilder(options: DataSourceOptions): ScanBuilder = {
+  override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
     throw new AnalysisException("Dummy file reader")
   }
 }
@@ -53,7 +54,7 @@ class DummyWriteOnlyFileDataSourceV2 extends FileDataSourceV2 {
 
   override def shortName(): String = "parquet"
 
-  override def getTable(options: DataSourceOptions): Table = {
+  override def getTable(options: CaseInsensitiveStringMap): Table = {
     new DummyWriteOnlyFileTable
   }
 }
@@ -63,7 +64,7 @@ class DummyWriteOnlyFileTable extends Table with SupportsBatchWrite {
 
   override def schema(): StructType = StructType(Nil)
 
-  override def newWriteBuilder(options: DataSourceOptions): WriteBuilder =
+  override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder =
     throw new AnalysisException("Dummy file writer")
 }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
index c56a545..1603545 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
@@ -25,12 +25,12 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 
 import org.apache.spark.SparkContext
-import org.apache.spark.internal.config.SPECULATION_ENABLED
 import org.apache.spark.sql.SaveMode
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.sources.v2.reader._
 import org.apache.spark.sql.sources.v2.writer._
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.util.SerializableConfiguration
 
 /**
@@ -141,22 +141,24 @@ class SimpleWritableDataSource extends TableProvider with SessionConfigSupport {
     }
   }
 
-  class MyTable(options: DataSourceOptions) extends SimpleBatchTable with SupportsBatchWrite {
-    private val path = options.get("path").get()
+  class MyTable(options: CaseInsensitiveStringMap)
+    extends SimpleBatchTable with SupportsBatchWrite {
+
+    private val path = options.get("path")
     private val conf = SparkContext.getActive.get.hadoopConfiguration
 
     override def schema(): StructType = tableSchema
 
-    override def newScanBuilder(options: DataSourceOptions): ScanBuilder = {
+    override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
       new MyScanBuilder(new Path(path).toUri.toString, conf)
     }
 
-    override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = {
+    override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = {
       new MyWriteBuilder(path)
     }
   }
 
-  override def getTable(options: DataSourceOptions): Table = {
+  override def getTable(options: CaseInsensitiveStringMap): Table = {
     new MyTable(options)
   }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
index 3c2c700..13bb686 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.sources.v2.writer.{WriteBuilder, WriterCommitMessage
 import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite}
 import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, StreamTest, Trigger}
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.util.Utils
 
 class FakeDataStream extends MicroBatchStream with ContinuousStream {
@@ -76,19 +77,19 @@ class FakeWriteBuilder extends WriteBuilder with StreamingWrite {
 trait FakeMicroBatchReadTable extends Table with SupportsMicroBatchRead {
   override def name(): String = "fake"
   override def schema(): StructType = StructType(Seq())
-  override def newScanBuilder(options: DataSourceOptions): ScanBuilder = new FakeScanBuilder
+  override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = new FakeScanBuilder
 }
 
 trait FakeContinuousReadTable extends Table with SupportsContinuousRead {
   override def name(): String = "fake"
   override def schema(): StructType = StructType(Seq())
-  override def newScanBuilder(options: DataSourceOptions): ScanBuilder = new FakeScanBuilder
+  override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = new FakeScanBuilder
 }
 
 trait FakeStreamingWriteTable extends Table with SupportsStreamingWrite {
   override def name(): String = "fake"
   override def schema(): StructType = StructType(Seq())
-  override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = {
+  override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = {
     new FakeWriteBuilder
   }
 }
@@ -101,7 +102,7 @@ class FakeReadMicroBatchOnly
 
   override def keyPrefix: String = shortName()
 
-  override def getTable(options: DataSourceOptions): Table = {
+  override def getTable(options: CaseInsensitiveStringMap): Table = {
     LastReadOptions.options = options
     new FakeMicroBatchReadTable {}
   }
@@ -115,7 +116,7 @@ class FakeReadContinuousOnly
 
   override def keyPrefix: String = shortName()
 
-  override def getTable(options: DataSourceOptions): Table = {
+  override def getTable(options: CaseInsensitiveStringMap): Table = {
     LastReadOptions.options = options
     new FakeContinuousReadTable {}
   }
@@ -124,7 +125,7 @@ class FakeReadContinuousOnly
 class FakeReadBothModes extends DataSourceRegister with TableProvider {
   override def shortName(): String = "fake-read-microbatch-continuous"
 
-  override def getTable(options: DataSourceOptions): Table = {
+  override def getTable(options: CaseInsensitiveStringMap): Table = {
     new Table with FakeMicroBatchReadTable with FakeContinuousReadTable {}
   }
 }
@@ -132,7 +133,7 @@ class FakeReadBothModes extends DataSourceRegister with TableProvider {
 class FakeReadNeitherMode extends DataSourceRegister with TableProvider {
   override def shortName(): String = "fake-read-neither-mode"
 
-  override def getTable(options: DataSourceOptions): Table = {
+  override def getTable(options: CaseInsensitiveStringMap): Table = {
     new Table {
       override def name(): String = "fake"
       override def schema(): StructType = StructType(Nil)
@@ -148,7 +149,7 @@ class FakeWriteOnly
 
   override def keyPrefix: String = shortName()
 
-  override def getTable(options: DataSourceOptions): Table = {
+  override def getTable(options: CaseInsensitiveStringMap): Table = {
     LastWriteOptions.options = options
     new Table with FakeStreamingWriteTable {
       override def name(): String = "fake"
@@ -159,7 +160,7 @@ class FakeWriteOnly
 
 class FakeNoWrite extends DataSourceRegister with TableProvider {
   override def shortName(): String = "fake-write-neither-mode"
-  override def getTable(options: DataSourceOptions): Table = {
+  override def getTable(options: CaseInsensitiveStringMap): Table = {
     new Table {
       override def name(): String = "fake"
       override def schema(): StructType = StructType(Nil)
@@ -186,7 +187,7 @@ class FakeWriteSupportProviderV1Fallback extends DataSourceRegister
 
   override def shortName(): String = "fake-write-v1-fallback"
 
-  override def getTable(options: DataSourceOptions): Table = {
+  override def getTable(options: CaseInsensitiveStringMap): Table = {
     new Table with FakeStreamingWriteTable {
       override def name(): String = "fake"
       override def schema(): StructType = StructType(Nil)
@@ -195,7 +196,7 @@ class FakeWriteSupportProviderV1Fallback extends DataSourceRegister
 }
 
 object LastReadOptions {
-  var options: DataSourceOptions = _
+  var options: CaseInsensitiveStringMap = _
 
   def clear(): Unit = {
     options = null
@@ -203,7 +204,7 @@ object LastReadOptions {
 }
 
 object LastWriteOptions {
-  var options: DataSourceOptions = _
+  var options: CaseInsensitiveStringMap = _
 
   def clear(): Unit = {
     options = null
@@ -320,7 +321,7 @@ class StreamingDataSourceV2Suite extends StreamTest {
         testPositiveCaseWithQuery(readSource, writeSource, trigger) { _ =>
           eventually(timeout(streamingTimeout)) {
             // Write options should not be set.
-            assert(LastWriteOptions.options.getBoolean(readOptionName, false) == false)
+            assert(!LastWriteOptions.options.containsKey(readOptionName))
             assert(LastReadOptions.options.getBoolean(readOptionName, false))
           }
         }
@@ -331,7 +332,7 @@ class StreamingDataSourceV2Suite extends StreamTest {
         testPositiveCaseWithQuery(readSource, writeSource, trigger) { _ =>
           eventually(timeout(streamingTimeout)) {
             // Read options should not be set.
-            assert(LastReadOptions.options.getBoolean(writeOptionName, false) == false)
+            assert(!LastReadOptions.options.containsKey(writeOptionName))
             assert(LastWriteOptions.options.getBoolean(writeOptionName, false))
           }
         }
@@ -351,10 +352,10 @@ class StreamingDataSourceV2Suite extends StreamTest {
   for ((read, write, trigger) <- cases) {
     testQuietly(s"stream with read format $read, write format $write, trigger $trigger") {
       val sourceTable = DataSource.lookupDataSource(read, spark.sqlContext.conf).getConstructor()
-        .newInstance().asInstanceOf[TableProvider].getTable(DataSourceOptions.empty())
+        .newInstance().asInstanceOf[TableProvider].getTable(CaseInsensitiveStringMap.empty())
 
       val sinkTable = DataSource.lookupDataSource(write, spark.sqlContext.conf).getConstructor()
-        .newInstance().asInstanceOf[TableProvider].getTable(DataSourceOptions.empty())
+        .newInstance().asInstanceOf[TableProvider].getTable(CaseInsensitiveStringMap.empty())
 
       (sourceTable, sinkTable, trigger) match {
         // Valid microbatch queries.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org