You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by fj...@apache.org on 2019/01/15 16:33:37 UTC

[incubator-druid] branch master updated: Some fixes and tests for spaces/non-ASCII chars in datasource names (#6761)

This is an automated email from the ASF dual-hosted git repository.

fjy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 8537a77  Some fixes and tests for spaces/non-ASCII chars in datasource names (#6761)
8537a77 is described below

commit 8537a771b00ce53cf6f2f4f9070eb3cd5d50d632
Author: Jonathan Wei <jo...@users.noreply.github.com>
AuthorDate: Tue Jan 15 08:33:31 2019 -0800

    Some fixes and tests for spaces/non-ASCII chars in datasource names (#6761)
    
    * Fixes and tests for spaces/non-ASCII datasource names
    
    * Some unit test fixes
    
    * Fix ITRealtimeIndexTaskTest
    
    * Checkstyle
    
    * TeamCity
    
    * PR comments
---
 ci/travis_script_integration_part2.sh              |  2 +-
 .../apache/druid/java/util/common/StringUtils.java | 32 ++++++++-
 .../druid/java/util/common/StringUtilsTest.java    | 12 ++++
 .../druid/indexing/common/IndexTaskClient.java     |  6 +-
 .../overlord/http/security/TaskResourceFilter.java |  3 +-
 .../resources/indexer_static/js/console-0.0.1.js   |  2 +-
 .../indexing/overlord/TaskRunnerUtilsTest.java     |  4 +-
 integration-tests/docker/Dockerfile                |  4 +-
 integration-tests/docker/middlemanager.conf        |  2 +-
 integration-tests/pom.xml                          |  1 +
 integration-tests/run_cluster.sh                   | 26 ++++---
 .../druid/testing/ConfigFileConfigProvider.java    |  6 ++
 .../apache/druid/testing/DockerConfigProvider.java |  9 +++
 .../druid/testing/IntegrationTestingConfig.java    |  2 +
 .../clients/ClientInfoResourceTestClient.java      |  7 +-
 .../clients/CoordinatorResourceTestClient.java     |  8 +--
 .../clients/OverlordResourceTestClient.java        |  5 +-
 .../druid/testing/utils/TestQueryHelper.java       |  2 +
 .../tests/indexer/AbstractITBatchIndexTest.java    | 71 +++++++++++++++++--
 .../indexer/AbstractITRealtimeIndexTaskTest.java   | 11 ++-
 .../druid/tests/indexer/AbstractIndexerTest.java   |  4 ++
 .../druid/tests/indexer/ITCompactionTaskTest.java  | 80 ++++++++++++++++++----
 .../apache/druid/tests/indexer/ITIndexerTest.java  |  6 +-
 .../tests/indexer/ITKafkaIndexingServiceTest.java  | 18 +++--
 .../apache/druid/tests/indexer/ITKafkaTest.java    | 16 +++--
 .../tests/indexer/ITNestedQueryPushDownTest.java   | 39 ++++++++++-
 .../druid/tests/indexer/ITParallelIndexTest.java   |  2 +-
 .../druid/tests/indexer/ITUnionQueryTest.java      | 44 +++++++++---
 .../src/test/resources/indexer/union_queries.json  | 32 ++++-----
 .../indexer/wikipedia_compaction_task.json         |  2 +-
 .../resources/indexer/wikipedia_index_queries.json |  4 +-
 .../resources/indexer/wikipedia_index_task.json    |  2 +-
 .../indexer/wikipedia_parallel_index_queries.json  |  4 +-
 .../indexer/wikipedia_parallel_index_task.json     |  2 +-
 ...ipedia_realtime_appenderator_index_queries.json |  6 +-
 ...wikipedia_realtime_appenderator_index_task.json |  2 +-
 .../indexer/wikipedia_realtime_index_queries.json  |  6 +-
 .../indexer/wikipedia_realtime_index_task.json     |  2 +-
 .../resources/indexer/wikipedia_reindex_task.json  |  4 +-
 .../indexer/wikipedia_union_index_task.json        |  2 +-
 .../resources/indexer/wikiticker_index_task.json   | 22 +-----
 .../queries/nestedquerypushdown_queries.json       | 10 +--
 .../client/coordinator/CoordinatorClient.java      |  2 +-
 .../client/indexing/HttpIndexingServiceClient.java | 12 +++-
 .../realtime/firehose/ChatHandlerResource.java     |  3 +-
 .../jetty/TaskIdResponseHeaderFilterHolder.java    |  3 +-
 46 files changed, 405 insertions(+), 139 deletions(-)

diff --git a/ci/travis_script_integration_part2.sh b/ci/travis_script_integration_part2.sh
index 61b3b9b..24cd797 100755
--- a/ci/travis_script_integration_part2.sh
+++ b/ci/travis_script_integration_part2.sh
@@ -21,6 +21,6 @@ set -e
 
 pushd $TRAVIS_BUILD_DIR/integration-tests
 
-mvn verify -P integration-tests -Dit.test=ITUnionQueryTest,ITTwitterQueryTest,ITWikipediaQueryTest,ITBasicAuthConfigurationTest,ITTLSTest
+mvn verify -P integration-tests -Dit.test=ITUnionQueryTest,ITNestedQueryPushDownTest,ITTwitterQueryTest,ITWikipediaQueryTest,ITBasicAuthConfigurationTest,ITTLSTest
 
 popd
diff --git a/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java b/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java
index 85f71d8..2c72e0e 100644
--- a/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java
+++ b/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java
@@ -24,6 +24,7 @@ import com.google.common.base.Throwables;
 
 import javax.annotation.Nullable;
 import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
 import java.net.URLEncoder;
 import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
@@ -153,10 +154,39 @@ public class StringUtils
     return s.toUpperCase(Locale.ENGLISH);
   }
 
+  /**
+   * Encodes a String in application/x-www-form-urlencoded format, with one exception:
+   * "+" in the encoded form is replaced with "%20".
+   *
+   * application/x-www-form-urlencoded encodes spaces as "+", but we use this to encode non-form data as well.
+   *
+   * @param s String to be encoded
+   * @return application/x-www-form-urlencoded format encoded String, but with "+" replaced with "%20".
+   */
+  @Nullable
   public static String urlEncode(String s)
   {
+    if (s == null) {
+      return null;
+    }
+
+    try {
+      return StringUtils.replace(URLEncoder.encode(s, "UTF-8"), "+", "%20");
+    }
+    catch (UnsupportedEncodingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Nullable
+  public static String urlDecode(String s)
+  {
+    if (s == null) {
+      return null;
+    }
+
     try {
-      return URLEncoder.encode(s, "UTF-8");
+      return URLDecoder.decode(s, "UTF-8");
     }
     catch (UnsupportedEncodingException e) {
       throw new RuntimeException(e);
diff --git a/core/src/test/java/org/apache/druid/java/util/common/StringUtilsTest.java b/core/src/test/java/org/apache/druid/java/util/common/StringUtilsTest.java
index 53f4942..69d209a 100644
--- a/core/src/test/java/org/apache/druid/java/util/common/StringUtilsTest.java
+++ b/core/src/test/java/org/apache/druid/java/util/common/StringUtilsTest.java
@@ -148,4 +148,16 @@ public class StringUtilsTest
     Assert.assertEquals("bb", StringUtils.replace("aaaa", "aa", "b"));
     Assert.assertEquals("", StringUtils.replace("aaaa", "aa", ""));
   }
+
+  @Test
+  public void testURLEncodeSpace()
+  {
+    String s1 = StringUtils.urlEncode("aaa bbb");
+    Assert.assertEquals(s1, "aaa%20bbb");
+    Assert.assertEquals("aaa bbb", StringUtils.urlDecode(s1));
+
+    String s2 = StringUtils.urlEncode("fff+ggg");
+    Assert.assertEquals(s2, "fff%2Bggg");
+    Assert.assertEquals("fff+ggg", StringUtils.urlDecode(s2));
+  }
 }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java
index 1b65ad4..483795f 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java
@@ -291,7 +291,7 @@ public abstract class IndexTaskClient implements AutoCloseable
 
     final Request request = new Request(method, serviceUrl);
     // used to validate that we are talking to the correct worker
-    request.addHeader(ChatHandlerResource.TASK_ID_HEADER, taskId);
+    request.addHeader(ChatHandlerResource.TASK_ID_HEADER, StringUtils.urlEncode(taskId));
     if (content.length > 0) {
       request.setContent(Preconditions.checkNotNull(mediaType, "mediaType"), content);
     }
@@ -369,7 +369,9 @@ public abstract class IndexTaskClient implements AutoCloseable
 
         final Duration delay;
         if (response != null && response.getStatus().equals(HttpResponseStatus.NOT_FOUND)) {
-          String headerId = response.getResponse().headers().get(ChatHandlerResource.TASK_ID_HEADER);
+          String headerId = StringUtils.urlDecode(
+              response.getResponse().headers().get(ChatHandlerResource.TASK_ID_HEADER)
+          );
           if (headerId != null && !headerId.equals(taskId)) {
             log.warn(
                 "Expected worker to have taskId [%s] but has taskId [%s], will retry in [%d]s",
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/security/TaskResourceFilter.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/security/TaskResourceFilter.java
index 8109961..af1f822 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/security/TaskResourceFilter.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/security/TaskResourceFilter.java
@@ -64,7 +64,7 @@ public class TaskResourceFilter extends AbstractResourceFilter
   @Override
   public ContainerRequest filter(ContainerRequest request)
   {
-    final String taskId = Preconditions.checkNotNull(
+    String taskId = Preconditions.checkNotNull(
         request.getPathSegments()
                .get(
                    Iterables.indexOf(
@@ -80,6 +80,7 @@ public class TaskResourceFilter extends AbstractResourceFilter
                    ) + 1
                ).getPath()
     );
+    taskId = StringUtils.urlDecode(taskId);
 
     Optional<Task> taskOptional = taskStorageQueryAdapter.getTask(taskId);
     if (!taskOptional.isPresent()) {
diff --git a/indexing-service/src/main/resources/indexer_static/js/console-0.0.1.js b/indexing-service/src/main/resources/indexer_static/js/console-0.0.1.js
index bda6094..db64b4f 100644
--- a/indexing-service/src/main/resources/indexer_static/js/console-0.0.1.js
+++ b/indexing-service/src/main/resources/indexer_static/js/console-0.0.1.js
@@ -123,7 +123,7 @@ $(document).ready(function() {
        '<span style="color:#FF6000">suspended</span>' :
        '<span style="color:#08B157">running</span>';
       data[i] = {
-        "dataSource" : supervisorId,
+        "dataSource" : dataList[i].id,
         "more" :
           '<a href="/druid/indexer/v1/supervisor/' + supervisorId + '">payload</a>' +
           '<a href="/druid/indexer/v1/supervisor/' + supervisorId + '/status">status</a>' +
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskRunnerUtilsTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskRunnerUtilsTest.java
index 7c02946..b21a98f 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskRunnerUtilsTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskRunnerUtilsTest.java
@@ -35,8 +35,8 @@ public class TaskRunnerUtilsTest
         "/druid/worker/v1/task/%s/log",
         "foo bar&"
     );
-    Assert.assertEquals("https://1.2.3.4:8290/druid/worker/v1/task/foo+bar%26/log", url.toString());
+    Assert.assertEquals("https://1.2.3.4:8290/druid/worker/v1/task/foo%20bar%26/log", url.toString());
     Assert.assertEquals("1.2.3.4:8290", url.getAuthority());
-    Assert.assertEquals("/druid/worker/v1/task/foo+bar%26/log", url.getPath());
+    Assert.assertEquals("/druid/worker/v1/task/foo%20bar%26/log", url.getPath());
   }
 }
diff --git a/integration-tests/docker/Dockerfile b/integration-tests/docker/Dockerfile
index 2b46de7..e3f3155 100644
--- a/integration-tests/docker/Dockerfile
+++ b/integration-tests/docker/Dockerfile
@@ -16,10 +16,12 @@
 # Base image is built from integration-tests/docker-base in the Druid repo
 FROM imply/druiditbase
 
+RUN echo "[mysqld]\ncharacter-set-server=utf8\ncollation-server=utf8_bin\n" >> /etc/mysql/my.cnf
+
 # Setup metadata store
 # touch is needed because OverlayFS's copy-up operation breaks POSIX standards. See https://github.com/docker/for-linux/issues/72.
 RUN find /var/lib/mysql -type f -exec touch {} \; && /etc/init.d/mysql start \
-      && echo "GRANT ALL ON druid.* TO 'druid'@'%' IDENTIFIED BY 'diurd'; CREATE database druid DEFAULT CHARACTER SET utf8;" | mysql -u root \
+      && echo "CREATE USER 'druid'@'%' IDENTIFIED BY 'diurd'; GRANT ALL ON druid.* TO 'druid'@'%'; CREATE database druid DEFAULT CHARACTER SET utf8mb4;" | mysql -u root \
       && /etc/init.d/mysql stop
 
 # Add Druid jars
diff --git a/integration-tests/docker/middlemanager.conf b/integration-tests/docker/middlemanager.conf
index 173829d..40adf19 100644
--- a/integration-tests/docker/middlemanager.conf
+++ b/integration-tests/docker/middlemanager.conf
@@ -14,7 +14,7 @@ command=java
   -Ddruid.worker.capacity=3
   -Ddruid.indexer.logs.directory=/shared/tasklogs
   -Ddruid.storage.storageDirectory=/shared/storage
-  -Ddruid.indexer.runner.javaOpts="-server -Xmx256m -Xms256m -XX:NewSize=128m -XX:MaxNewSize=128m -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -Dlog4j.configurationFile=/shared/docker/lib/log4j2.xml"
+  -Ddruid.indexer.runner.javaOpts="-server -Xmx256m -Xms256m -XX:NewSize=128m -XX:MaxNewSize=128m -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Dlog4j.configurationFile=/shared/docker/lib/log4j2.xml"
   -Ddruid.indexer.fork.property.druid.processing.buffer.sizeBytes=25000000
   -Ddruid.indexer.fork.property.druid.processing.numThreads=1
   -Ddruid.indexer.fork.server.http.numThreads=100
diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml
index 653beba..49a6158 100644
--- a/integration-tests/pom.xml
+++ b/integration-tests/pom.xml
@@ -231,6 +231,7 @@
                                 -Dfile.encoding=UTF-8
                                 -Ddruid.test.config.dockerIp=${env.DOCKER_IP}
                                 -Ddruid.test.config.hadoopDir=${env.HADOOP_DIR}
+                                -Ddruid.test.config.extraDatasourceNameSuffix=\ Россия\ 한국\ 中国!?
                                 -Ddruid.zk.service.host=${env.DOCKER_IP}
                                 -Ddruid.client.https.trustStorePath=client_tls/truststore.jks
                                 -Ddruid.client.https.trustStorePassword=druid123
diff --git a/integration-tests/run_cluster.sh b/integration-tests/run_cluster.sh
index c08867a..49d046a 100755
--- a/integration-tests/run_cluster.sh
+++ b/integration-tests/run_cluster.sh
@@ -53,40 +53,44 @@ cp src/main/resources/log4j2.xml $SHARED_DIR/docker/lib/log4j2.xml
 # copy the integration test jar, it provides test-only extension implementations
 cp target/druid-integration-tests*.jar $SHARED_DIR/docker/lib
 
+# one of the integration tests needs the wikiticker sample data
+mkdir -p $SHARED_DIR/wikiticker-it
+cp ../examples/quickstart/tutorial/wikiticker-2015-09-12-sampled.json.gz $SHARED_DIR/wikiticker-it/wikiticker-2015-09-12-sampled.json.gz
+
 docker network create --subnet=172.172.172.0/24 druid-it-net
 
 # Build Druid Cluster Image
 docker build -t druid/cluster $SHARED_DIR/docker
 
 # Start zookeeper and kafka
-docker run -d --privileged --net druid-it-net --ip 172.172.172.2 --name druid-zookeeper-kafka -p 2181:2181 -p 9092:9092 -p 9093:9093 -v $SHARED_DIR:/shared -v $DOCKERDIR/zookeeper.conf:$SUPERVISORDIR/zookeeper.conf -v $DOCKERDIR/kafka.conf:$SUPERVISORDIR/kafka.conf druid/cluster
+docker run -d --privileged --net druid-it-net --ip 172.172.172.2 -e LANG=C.UTF-8 -e LANGUAGE=C.UTF-8 -e LC_ALL=C.UTF-8 --name druid-zookeeper-kafka -p 2181:2181 -p 9092:9092 -p 9093:9093 -v $SHARED_DIR:/shared -v $DOCKERDIR/zookeeper.conf:$SUPERVISORDIR/zookeeper.conf -v $DOCKERDIR/kafka.conf:$SUPERVISORDIR/kafka.conf druid/cluster
 
 # Start MYSQL 
-docker run -d --privileged --net druid-it-net --ip 172.172.172.3 --name druid-metadata-storage -v $SHARED_DIR:/shared -v $DOCKERDIR/metadata-storage.conf:$SUPERVISORDIR/metadata-storage.conf druid/cluster
+docker run -d --privileged --net druid-it-net --ip 172.172.172.3 -e LANG=C.UTF-8 -e LANGUAGE=C.UTF-8 -e LC_ALL=C.UTF-8 --name druid-metadata-storage -v $SHARED_DIR:/shared -v $DOCKERDIR/metadata-storage.conf:$SUPERVISORDIR/metadata-storage.conf druid/cluster
 
 # Start Overlord
-docker run -d --privileged --net druid-it-net --ip 172.172.172.4 --name druid-overlord -p 8090:8090 -p 8290:8290 -v $SHARED_DIR:/shared -v $DOCKERDIR/overlord.conf:$SUPERVISORDIR/overlord.conf --link druid-metadata-storage:druid-metadata-storage --link druid-zookeeper-kafka:druid-zookeeper-kafka druid/cluster
+docker run -d --privileged --net druid-it-net --ip 172.172.172.4 -e LANG=C.UTF-8 -e LANGUAGE=C.UTF-8 -e LC_ALL=C.UTF-8 --name druid-overlord -p 8090:8090 -p 8290:8290 -v $SHARED_DIR:/shared -v $DOCKERDIR/overlord.conf:$SUPERVISORDIR/overlord.conf --link druid-metadata-storage:druid-metadata-storage --link druid-zookeeper-kafka:druid-zookeeper-kafka druid/cluster
 
 # Start Coordinator
-docker run -d --privileged --net druid-it-net --ip 172.172.172.5 --name druid-coordinator -p 8081:8081 -p 8281:8281 -v $SHARED_DIR:/shared -v $DOCKERDIR/coordinator.conf:$SUPERVISORDIR/coordinator.conf --link druid-overlord:druid-overlord --link druid-metadata-storage:druid-metadata-storage --link druid-zookeeper-kafka:druid-zookeeper-kafka druid/cluster
+docker run -d --privileged --net druid-it-net --ip 172.172.172.5 -e LANG=C.UTF-8 -e LANGUAGE=C.UTF-8 -e LC_ALL=C.UTF-8 --name druid-coordinator -p 8081:8081 -p 8281:8281 -v $SHARED_DIR:/shared -v $DOCKERDIR/coordinator.conf:$SUPERVISORDIR/coordinator.conf --link druid-overlord:druid-overlord --link druid-metadata-storage:druid-metadata-storage --link druid-zookeeper-kafka:druid-zookeeper-kafka druid/cluster
 
 # Start Historical 
-docker run -d --privileged --net druid-it-net --ip 172.172.172.6 --name druid-historical -p 8083:8083 -p 8283:8283 -v $SHARED_DIR:/shared -v $DOCKERDIR/historical.conf:$SUPERVISORDIR/historical.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka druid/cluster
+docker run -d --privileged --net druid-it-net --ip 172.172.172.6 -e LANG=C.UTF-8 -e LANGUAGE=C.UTF-8 -e LC_ALL=C.UTF-8 --name druid-historical -p 8083:8083 -p 8283:8283 -v $SHARED_DIR:/shared -v $DOCKERDIR/historical.conf:$SUPERVISORDIR/historical.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka druid/cluster
 
 # Start Middlemanger
-docker run -d --privileged --net druid-it-net --ip 172.172.172.7 --name druid-middlemanager -p 8091:8091 -p 8291:8291 -p 8100:8100 -p 8101:8101 -p 8102:8102 -p 8103:8103 -p 8104:8104 -p 8105:8105 -p 8300:8300 -p 8301:8301 -p 8302:8302 -p 8303:8303 -p 8304:8304 -p 8305:8305 -v $RESOURCEDIR:/resources -v $SHARED_DIR:/shared -v $DOCKERDIR/middlemanager.conf:$SUPERVISORDIR/middlemanager.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-overlord:druid-overlord druid/cluster
+docker run -d --privileged --net druid-it-net --ip 172.172.172.7 -e LANG=C.UTF-8 -e LANGUAGE=C.UTF-8 -e LC_ALL=C.UTF-8 --name druid-middlemanager -p 8091:8091 -p 8291:8291 -p 8100:8100 -p 8101:8101 -p 8102:8102 -p 8103:8103 -p 8104:8104 -p 8105:8105 -p 8300:8300 -p 8301:8301 -p 8302:8302 -p 8303:8303 -p 8304:8304 -p 8305:8305 -v $RESOURCEDIR:/resources -v $SHARED_DIR:/shared -v $DOCKERDIR/middlemanager.conf:$SUPERVISORDIR/middlemanager.conf --link druid-zookeeper-kafka:druid-zookeeper-ka [...]
 
 # Start Broker 
-docker run -d --privileged --net druid-it-net --ip 172.172.172.8 --name druid-broker -p 8082:8082 -p 8282:8282 -v $SHARED_DIR:/shared -v $DOCKERDIR/broker.conf:$SUPERVISORDIR/broker.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-middlemanager:druid-middlemanager --link druid-historical:druid-historical druid/cluster
+docker run -d --privileged --net druid-it-net --ip 172.172.172.8 -e LANG=C.UTF-8 -e LANGUAGE=C.UTF-8 -e LC_ALL=C.UTF-8 --name druid-broker -p 8082:8082 -p 8282:8282 -v $SHARED_DIR:/shared -v $DOCKERDIR/broker.conf:$SUPERVISORDIR/broker.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-middlemanager:druid-middlemanager --link druid-historical:druid-historical druid/cluster
 
 # Start Router
-docker run -d --privileged --net druid-it-net --ip 172.172.172.9 --name druid-router -p 8888:8888 -p 9088:9088 -v $SHARED_DIR:/shared -v $DOCKERDIR/router.conf:$SUPERVISORDIR/router.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-coordinator:druid-coordinator --link druid-broker:druid-broker druid/cluster
+docker run -d --privileged --net druid-it-net --ip 172.172.172.9 -e LANG=C.UTF-8 -e LANGUAGE=C.UTF-8 -e LC_ALL=C.UTF-8 --name druid-router -p 8888:8888 -p 9088:9088 -v $SHARED_DIR:/shared -v $DOCKERDIR/router.conf:$SUPERVISORDIR/router.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-coordinator:druid-coordinator --link druid-broker:druid-broker druid/cluster
 
 # Start Router with permissive TLS settings (client auth enabled, no hostname verification, no revocation check)
-docker run -d --privileged --net druid-it-net --ip 172.172.172.10 --name druid-router-permissive-tls -p 8889:8889 -p 9089:9089 -v $SHARED_DIR:/shared -v $DOCKERDIR/router-permissive-tls.conf:$SUPERVISORDIR/router-permissive-tls.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-coordinator:druid-coordinator --link druid-broker:druid-broker druid/cluster
+docker run -d --privileged --net druid-it-net --ip 172.172.172.10 -e LANG=C.UTF-8 -e LANGUAGE=C.UTF-8 -e LC_ALL=C.UTF-8 --name druid-router-permissive-tls -p 8889:8889 -p 9089:9089 -v $SHARED_DIR:/shared -v $DOCKERDIR/router-permissive-tls.conf:$SUPERVISORDIR/router-permissive-tls.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-coordinator:druid-coordinator --link druid-broker:druid-broker druid/cluster
 
 # Start Router with TLS but no client auth
-docker run -d --privileged --net druid-it-net --ip 172.172.172.11 --name druid-router-no-client-auth-tls -p 8890:8890 -p 9090:9090 -v $SHARED_DIR:/shared -v $DOCKERDIR/router-no-client-auth-tls.conf:$SUPERVISORDIR/router-no-client-auth-tls.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-coordinator:druid-coordinator --link druid-broker:druid-broker druid/cluster
+docker run -d --privileged --net druid-it-net --ip 172.172.172.11 -e LANG=C.UTF-8 -e LANGUAGE=C.UTF-8 -e LC_ALL=C.UTF-8 --name druid-router-no-client-auth-tls -p 8890:8890 -p 9090:9090 -v $SHARED_DIR:/shared -v $DOCKERDIR/router-no-client-auth-tls.conf:$SUPERVISORDIR/router-no-client-auth-tls.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-coordinator:druid-coordinator --link druid-broker:druid-broker druid/cluster
 
 # Start Router with custom TLS cert checkers
-docker run -d --privileged --net druid-it-net --ip 172.172.172.12 --hostname druid-router-custom-check-tls --name druid-router-custom-check-tls -p 8891:8891 -p 9091:9091 -v $SHARED_DIR:/shared -v $DOCKERDIR/router-custom-check-tls.conf:$SUPERVISORDIR/router-custom-check-tls.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-coordinator:druid-coordinator --link druid-broker:druid-broker druid/cluster
+docker run -d --privileged --net druid-it-net --ip 172.172.172.12 -e LANG=C.UTF-8 -e LANGUAGE=C.UTF-8 -e LC_ALL=C.UTF-8 --hostname druid-router-custom-check-tls --name druid-router-custom-check-tls -p 8891:8891 -p 9091:9091 -v $SHARED_DIR:/shared -v $DOCKERDIR/router-custom-check-tls.conf:$SUPERVISORDIR/router-custom-check-tls.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-coordinator:druid-coordinator --link druid-broker:druid-broker druid/cluster
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/ConfigFileConfigProvider.java b/integration-tests/src/main/java/org/apache/druid/testing/ConfigFileConfigProvider.java
index 990b918..976eb89 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/ConfigFileConfigProvider.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/ConfigFileConfigProvider.java
@@ -348,6 +348,12 @@ public class ConfigFileConfigProvider implements IntegrationTestingConfigProvide
       {
         return Boolean.valueOf(props.getOrDefault("manageKafkaTopic", "true"));
       }
+
+      @Override
+      public String getExtraDatasourceNameSuffix()
+      {
+        return "";
+      }
     };
   }
 }
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/DockerConfigProvider.java b/integration-tests/src/main/java/org/apache/druid/testing/DockerConfigProvider.java
index 04d512a..7cd8d93 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/DockerConfigProvider.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/DockerConfigProvider.java
@@ -37,6 +37,9 @@ public class DockerConfigProvider implements IntegrationTestingConfigProvider
   @NotNull
   private String hadoopDir;
 
+  @JsonProperty
+  private String extraDatasourceNameSuffix = "";
+
   @Override
   public IntegrationTestingConfig get()
   {
@@ -202,6 +205,12 @@ public class DockerConfigProvider implements IntegrationTestingConfigProvider
       {
         return true;
       }
+
+      @Override
+      public String getExtraDatasourceNameSuffix()
+      {
+        return extraDatasourceNameSuffix;
+      }
     };
   }
 }
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/IntegrationTestingConfig.java b/integration-tests/src/main/java/org/apache/druid/testing/IntegrationTestingConfig.java
index ec321a0..f4e745f 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/IntegrationTestingConfig.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/IntegrationTestingConfig.java
@@ -82,4 +82,6 @@ public interface IntegrationTestingConfig
   Map<String, String> getProperties();
 
   boolean manageKafkaTopic();
+
+  String getExtraDatasourceNameSuffix();
 }
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/ClientInfoResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/ClientInfoResourceTestClient.java
index 63858d3..8c1b5d1 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/clients/ClientInfoResourceTestClient.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/ClientInfoResourceTestClient.java
@@ -72,7 +72,12 @@ public class ClientInfoResourceTestClient
       StatusResponseHolder response = httpClient.go(
           new Request(
               HttpMethod.GET,
-              new URL(StringUtils.format("%s/%s/dimensions?interval=%s", getBrokerURL(), dataSource, interval))
+              new URL(StringUtils.format(
+                  "%s/%s/dimensions?interval=%s",
+                  getBrokerURL(),
+                  StringUtils.urlEncode(dataSource),
+                  interval
+              ))
           ),
           responseHandler
       ).get();
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/CoordinatorResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/CoordinatorResourceTestClient.java
index c3145aa..df6ad20 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/clients/CoordinatorResourceTestClient.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/CoordinatorResourceTestClient.java
@@ -72,12 +72,12 @@ public class CoordinatorResourceTestClient
 
   private String getMetadataSegmentsURL(String dataSource)
   {
-    return StringUtils.format("%smetadata/datasources/%s/segments", getCoordinatorURL(), dataSource);
+    return StringUtils.format("%smetadata/datasources/%s/segments", getCoordinatorURL(), StringUtils.urlEncode(dataSource));
   }
 
   private String getIntervalsURL(String dataSource)
   {
-    return StringUtils.format("%sdatasources/%s/intervals", getCoordinatorURL(), dataSource);
+    return StringUtils.format("%sdatasources/%s/intervals", getCoordinatorURL(), StringUtils.urlEncode(dataSource));
   }
 
   private String getLoadStatusURL()
@@ -150,7 +150,7 @@ public class CoordinatorResourceTestClient
   public void unloadSegmentsForDataSource(String dataSource)
   {
     try {
-      makeRequest(HttpMethod.DELETE, StringUtils.format("%sdatasources/%s", getCoordinatorURL(), dataSource));
+      makeRequest(HttpMethod.DELETE, StringUtils.format("%sdatasources/%s", getCoordinatorURL(), StringUtils.urlEncode(dataSource)));
     }
     catch (Exception e) {
       throw Throwables.propagate(e);
@@ -165,7 +165,7 @@ public class CoordinatorResourceTestClient
           StringUtils.format(
               "%sdatasources/%s/intervals/%s",
               getCoordinatorURL(),
-              dataSource,
+              StringUtils.urlEncode(dataSource),
               interval.toString().replace('/', '_')
           )
       );
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
index 45f55d6..5cee2fb 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
@@ -42,7 +42,6 @@ import org.jboss.netty.handler.codec.http.HttpMethod;
 import org.jboss.netty.handler.codec.http.HttpResponseStatus;
 
 import java.net.URL;
-import java.net.URLEncoder;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.Map;
@@ -121,7 +120,7 @@ public class OverlordResourceTestClient
           StringUtils.format(
               "%stask/%s/status",
               getIndexerURL(),
-              URLEncoder.encode(taskID, "UTF-8")
+              StringUtils.urlEncode(taskID)
           )
       );
 
@@ -234,7 +233,7 @@ public class OverlordResourceTestClient
   {
     try {
       StatusResponseHolder response = httpClient.go(
-          new Request(HttpMethod.POST, new URL(StringUtils.format("%ssupervisor/%s/shutdown", getIndexerURL(), id))),
+          new Request(HttpMethod.POST, new URL(StringUtils.format("%ssupervisor/%s/shutdown", getIndexerURL(), StringUtils.urlEncode(id)))),
           responseHandler
       ).get();
       if (!response.getStatus().equals(HttpResponseStatus.OK)) {
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/TestQueryHelper.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/TestQueryHelper.java
index 649557d..888979d 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/utils/TestQueryHelper.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/TestQueryHelper.java
@@ -45,6 +45,7 @@ public class TestQueryHelper
   private final String brokerTLS;
   private final String router;
   private final String routerTLS;
+  private final IntegrationTestingConfig config;
 
   @Inject
   TestQueryHelper(
@@ -59,6 +60,7 @@ public class TestQueryHelper
     this.brokerTLS = config.getBrokerTLSUrl();
     this.router = config.getRouterUrl();
     this.routerTLS = config.getRouterTLSUrl();
+    this.config = config;
   }
 
   public void testQueriesFromFile(String filePath, int timesToRun) throws Exception
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java
index f417c14..b9bac8b 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java
@@ -20,6 +20,9 @@
 package org.apache.druid.tests.indexer;
 
 import com.google.inject.Inject;
+import org.apache.commons.io.IOUtils;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.testing.IntegrationTestingConfig;
 import org.apache.druid.testing.clients.ClientInfoResourceTestClient;
@@ -27,6 +30,7 @@ import org.apache.druid.testing.utils.RetryUtil;
 import org.junit.Assert;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.List;
 
 public class AbstractITBatchIndexTest extends AbstractIndexerTest
@@ -45,9 +49,31 @@ public class AbstractITBatchIndexTest extends AbstractIndexerTest
       String queryFilePath
   ) throws IOException
   {
-    submitTaskAndWait(indexTaskFilePath, dataSource);
+    final String fullDatasourceName = dataSource + config.getExtraDatasourceNameSuffix();
+    final String taskSpec = StringUtils.replace(
+        getTaskAsString(indexTaskFilePath),
+        "%%DATASOURCE%%",
+        fullDatasourceName
+    );
+
+    submitTaskAndWait(taskSpec, fullDatasourceName);
     try {
-      queryHelper.testQueriesFromFile(queryFilePath, 2);
+
+      String queryResponseTemplate;
+      try {
+        InputStream is = AbstractITBatchIndexTest.class.getResourceAsStream(queryFilePath);
+        queryResponseTemplate = IOUtils.toString(is, "UTF-8");
+      }
+      catch (IOException e) {
+        throw new ISE(e, "could not read query file: %s", queryFilePath);
+      }
+
+      queryResponseTemplate = StringUtils.replace(
+          queryResponseTemplate,
+          "%%DATASOURCE%%",
+          fullDatasourceName
+      );
+      queryHelper.testQueriesFromString(queryResponseTemplate, 2);
 
     }
     catch (Exception e) {
@@ -57,17 +83,48 @@ public class AbstractITBatchIndexTest extends AbstractIndexerTest
   }
 
   void doReindexTest(
+      String baseDataSource,
       String reindexDataSource,
       String reindexTaskFilePath,
       String queryFilePath
   ) throws IOException
   {
-    submitTaskAndWait(reindexTaskFilePath, reindexDataSource);
+    final String fullBaseDatasourceName = baseDataSource + config.getExtraDatasourceNameSuffix();
+    final String fullReindexDatasourceName = reindexDataSource + config.getExtraDatasourceNameSuffix();
+
+    String taskSpec = StringUtils.replace(
+        getTaskAsString(reindexTaskFilePath),
+        "%%DATASOURCE%%",
+        fullBaseDatasourceName
+    );
+
+    taskSpec = StringUtils.replace(
+        taskSpec,
+        "%%REINDEX_DATASOURCE%%",
+        fullReindexDatasourceName
+    );
+
+    submitTaskAndWait(taskSpec, fullReindexDatasourceName);
     try {
-      queryHelper.testQueriesFromFile(queryFilePath, 2);
+      String queryResponseTemplate;
+      try {
+        InputStream is = AbstractITBatchIndexTest.class.getResourceAsStream(queryFilePath);
+        queryResponseTemplate = IOUtils.toString(is, "UTF-8");
+      }
+      catch (IOException e) {
+        throw new ISE(e, "could not read query file: %s", queryFilePath);
+      }
+
+      queryResponseTemplate = StringUtils.replace(
+          queryResponseTemplate,
+          "%%DATASOURCE%%",
+          fullBaseDatasourceName
+      );
+
+      queryHelper.testQueriesFromString(queryResponseTemplate, 2);
       // verify excluded dimension is not reIndexed
       final List<String> dimensions = clientInfoResourceTestClient.getDimensions(
-          reindexDataSource,
+          fullReindexDatasourceName,
           "2013-08-31T00:00:00.000Z/2013-09-10T00:00:00.000Z"
       );
       Assert.assertFalse("dimensions : " + dimensions, dimensions.contains("robot"));
@@ -78,9 +135,9 @@ public class AbstractITBatchIndexTest extends AbstractIndexerTest
     }
   }
 
-  private void submitTaskAndWait(String indexTaskFilePath, String dataSourceName) throws IOException
+  private void submitTaskAndWait(String taskSpec, String dataSourceName)
   {
-    final String taskID = indexer.submitTask(getTaskAsString(indexTaskFilePath));
+    final String taskID = indexer.submitTask(taskSpec);
     LOG.info("TaskID for loading index task %s", taskID);
     indexer.waitUntilTaskCompletes(taskID);
 
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITRealtimeIndexTaskTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITRealtimeIndexTaskTest.java
index 176ea11..cdf61de 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITRealtimeIndexTaskTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITRealtimeIndexTaskTest.java
@@ -79,15 +79,21 @@ public abstract class AbstractITRealtimeIndexTaskTest extends AbstractIndexerTes
   @Inject
   IntegrationTestingConfig config;
 
+  private String fullDatasourceName;
+
   void doTest()
   {
+    fullDatasourceName = INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix();
+
     LOG.info("Starting test: ITRealtimeIndexTaskTest");
-    try (final Closeable closeable = unloader(INDEX_DATASOURCE)) {
+    try (final Closeable closeable = unloader(fullDatasourceName)) {
       // the task will run for 3 minutes and then shutdown itself
       String task = setShutOffTime(
           getTaskAsString(getTaskResource()),
           DateTimes.utc(System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(3))
       );
+      task = StringUtils.replace(task, "%%DATASOURCE%%", fullDatasourceName);
+
       LOG.info("indexerSpec: [%s]\n", task);
       taskID = indexer.submitTask(task);
 
@@ -119,6 +125,7 @@ public abstract class AbstractITRealtimeIndexTaskTest extends AbstractIndexerTes
       queryStr = StringUtils.replace(queryStr, "%%POST_AG_REQUEST_END%%", INTERVAL_FMT.print(dtLast.plusMinutes(2)));
       String postAgResponseTimestamp = TIMESTAMP_FMT.print(dtGroupBy.withSecondOfMinute(0));
       queryStr = StringUtils.replace(queryStr, "%%POST_AG_RESPONSE_TIMESTAMP%%", postAgResponseTimestamp);
+      queryStr = StringUtils.replace(queryStr, "%%DATASOURCE%%", fullDatasourceName);
 
       // should hit the queries all on realtime task or some on realtime task
       // and some on historical.  Which it is depends on where in the minute we were
@@ -140,7 +147,7 @@ public abstract class AbstractITRealtimeIndexTaskTest extends AbstractIndexerTes
             @Override
             public Boolean call()
             {
-              return coordinator.areSegmentsLoaded(INDEX_DATASOURCE);
+              return coordinator.areSegmentsLoaded(fullDatasourceName);
             }
           },
           true,
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java
index 7f78192..b739d79 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java
@@ -25,6 +25,7 @@ import org.apache.commons.io.IOUtils;
 import org.apache.druid.guice.annotations.Json;
 import org.apache.druid.guice.annotations.Smile;
 import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.testing.IntegrationTestingConfig;
 import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
 import org.apache.druid.testing.clients.OverlordResourceTestClient;
 import org.apache.druid.testing.utils.RetryUtil;
@@ -54,6 +55,9 @@ public abstract class AbstractIndexerTest
   @Inject
   protected TestQueryHelper queryHelper;
 
+  @Inject
+  private IntegrationTestingConfig config;
+
   protected Closeable unloader(final String dataSource)
   {
     return () -> unloadAndKillData(dataSource);
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java
index 5ae32fa..db6ebff 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java
@@ -19,15 +19,21 @@
 
 package org.apache.druid.tests.indexer;
 
+import com.google.inject.Inject;
+import org.apache.commons.io.IOUtils;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.testing.IntegrationTestingConfig;
 import org.apache.druid.testing.guice.DruidTestModuleFactory;
 import org.apache.druid.testing.utils.RetryUtil;
+import org.testng.annotations.BeforeSuite;
 import org.testng.annotations.Guice;
 import org.testng.annotations.Test;
 
 import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
 import java.util.List;
 
 @Guice(moduleFactory = DruidTestModuleFactory.class)
@@ -39,23 +45,49 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
   private static String INDEX_DATASOURCE = "wikipedia_index_test";
   private static String COMPACTION_TASK = "/indexer/wikipedia_compaction_task.json";
 
+  @Inject
+  private IntegrationTestingConfig config;
+
+  private String fullDatasourceName;
+
+  @BeforeSuite
+  public void setFullDatasourceName()
+  {
+    fullDatasourceName = INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix();
+  }
+
   @Test
   public void testCompactionWithoutKeepSegmentGranularity() throws Exception
   {
     loadData();
-    final List<String> intervalsBeforeCompaction = coordinator.getSegmentIntervals(INDEX_DATASOURCE);
+    final List<String> intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName);
     intervalsBeforeCompaction.sort(null);
     final String compactedInterval = "2013-08-31T00:00:00.000Z/2013-09-02T00:00:00.000Z";
     if (intervalsBeforeCompaction.contains(compactedInterval)) {
       throw new ISE("Containing a segment for the compacted interval[%s] before compaction", compactedInterval);
     }
-    try (final Closeable closeable = unloader(INDEX_DATASOURCE)) {
-      queryHelper.testQueriesFromFile(INDEX_QUERIES_RESOURCE, 2);
+    try (final Closeable closeable = unloader(fullDatasourceName)) {
+      String queryResponseTemplate;
+      try {
+        InputStream is = AbstractITBatchIndexTest.class.getResourceAsStream(INDEX_QUERIES_RESOURCE);
+        queryResponseTemplate = IOUtils.toString(is, "UTF-8");
+      }
+      catch (IOException e) {
+        throw new ISE(e, "could not read query file: %s", INDEX_QUERIES_RESOURCE);
+      }
+
+      queryResponseTemplate = StringUtils.replace(
+          queryResponseTemplate,
+          "%%DATASOURCE%%",
+          fullDatasourceName
+      );
+
+      queryHelper.testQueriesFromString(queryResponseTemplate, 2);
       compactData(false);
 
       // 4 segments across 2 days, compacted into 1 new segment (5 total)
       checkCompactionFinished(5);
-      queryHelper.testQueriesFromFile(INDEX_QUERIES_RESOURCE, 2);
+      queryHelper.testQueriesFromString(queryResponseTemplate, 2);
 
       intervalsBeforeCompaction.add(compactedInterval);
       intervalsBeforeCompaction.sort(null);
@@ -67,15 +99,31 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
   public void testCompactionWithKeepSegmentGranularity() throws Exception
   {
     loadData();
-    final List<String> intervalsBeforeCompaction = coordinator.getSegmentIntervals(INDEX_DATASOURCE);
+    final List<String> intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName);
     intervalsBeforeCompaction.sort(null);
-    try (final Closeable closeable = unloader(INDEX_DATASOURCE)) {
-      queryHelper.testQueriesFromFile(INDEX_QUERIES_RESOURCE, 2);
+    try (final Closeable closeable = unloader(fullDatasourceName)) {
+      String queryResponseTemplate;
+      try {
+        InputStream is = AbstractITBatchIndexTest.class.getResourceAsStream(INDEX_QUERIES_RESOURCE);
+        queryResponseTemplate = IOUtils.toString(is, "UTF-8");
+      }
+      catch (IOException e) {
+        throw new ISE(e, "could not read query file: %s", INDEX_QUERIES_RESOURCE);
+      }
+
+      queryResponseTemplate = StringUtils.replace(
+          queryResponseTemplate,
+          "%%DATASOURCE%%",
+          fullDatasourceName
+      );
+
+
+      queryHelper.testQueriesFromString(queryResponseTemplate, 2);
       compactData(true);
 
       // 4 segments across 2 days, compacted into 2 new segments (6 total)
       checkCompactionFinished(6);
-      queryHelper.testQueriesFromFile(INDEX_QUERIES_RESOURCE, 2);
+      queryHelper.testQueriesFromString(queryResponseTemplate, 2);
 
       checkCompactionIntervals(intervalsBeforeCompaction);
     }
@@ -83,12 +131,14 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
 
   private void loadData() throws Exception
   {
-    final String taskID = indexer.submitTask(getTaskAsString(INDEX_TASK));
+    String taskSpec = getTaskAsString(INDEX_TASK);
+    taskSpec = StringUtils.replace(taskSpec, "%%DATASOURCE%%", fullDatasourceName);
+    final String taskID = indexer.submitTask(taskSpec);
     LOG.info("TaskID for loading index task %s", taskID);
     indexer.waitUntilTaskCompletes(taskID);
 
     RetryUtil.retryUntilTrue(
-        () -> coordinator.areSegmentsLoaded(INDEX_DATASOURCE),
+        () -> coordinator.areSegmentsLoaded(fullDatasourceName),
         "Segment Load"
     );
   }
@@ -96,14 +146,16 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
   private void compactData(boolean keepSegmentGranularity) throws Exception
   {
     final String template = getTaskAsString(COMPACTION_TASK);
-    final String taskSpec =
+    String taskSpec =
         StringUtils.replace(template, "${KEEP_SEGMENT_GRANULARITY}", Boolean.toString(keepSegmentGranularity));
+    taskSpec = StringUtils.replace(taskSpec, "%%DATASOURCE%%", fullDatasourceName);
+
     final String taskID = indexer.submitTask(taskSpec);
     LOG.info("TaskID for compaction task %s", taskID);
     indexer.waitUntilTaskCompletes(taskID);
 
     RetryUtil.retryUntilTrue(
-        () -> coordinator.areSegmentsLoaded(INDEX_DATASOURCE),
+        () -> coordinator.areSegmentsLoaded(fullDatasourceName),
         "Segment Compaction"
     );
   }
@@ -112,7 +164,7 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
   {
     RetryUtil.retryUntilTrue(
         () -> {
-          int metadataSegmentCount = coordinator.getMetadataSegments(INDEX_DATASOURCE).size();
+          int metadataSegmentCount = coordinator.getMetadataSegments(fullDatasourceName).size();
           LOG.info("Current metadata segment count: %d, expected: %d", metadataSegmentCount, numExpectedSegments);
           return metadataSegmentCount == numExpectedSegments;
         },
@@ -124,7 +176,7 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
   {
     RetryUtil.retryUntilTrue(
         () -> {
-          final List<String> intervalsAfterCompaction = coordinator.getSegmentIntervals(INDEX_DATASOURCE);
+          final List<String> intervalsAfterCompaction = coordinator.getSegmentIntervals(fullDatasourceName);
           intervalsAfterCompaction.sort(null);
           System.out.println("AFTER: " + intervalsAfterCompaction);
           System.out.println("EXPECTED: " + expectedIntervals);
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java
index 63681e2..8412e49 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java
@@ -31,6 +31,7 @@ public class ITIndexerTest extends AbstractITBatchIndexTest
   private static String INDEX_TASK = "/indexer/wikipedia_index_task.json";
   private static String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json";
   private static String INDEX_DATASOURCE = "wikipedia_index_test";
+
   private static String REINDEX_TASK = "/indexer/wikipedia_reindex_task.json";
   private static String REINDEX_DATASOURCE = "wikipedia_reindex_test";
 
@@ -38,8 +39,8 @@ public class ITIndexerTest extends AbstractITBatchIndexTest
   public void testIndexData() throws Exception
   {
     try (
-        final Closeable indexCloseable = unloader(INDEX_DATASOURCE);
-        final Closeable reindexCloseable = unloader(REINDEX_DATASOURCE)
+        final Closeable indexCloseable = unloader(INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix());
+        final Closeable reindexCloseable = unloader(REINDEX_DATASOURCE + config.getExtraDatasourceNameSuffix());
     ) {
       doIndexTestTest(
           INDEX_DATASOURCE,
@@ -47,6 +48,7 @@ public class ITIndexerTest extends AbstractITBatchIndexTest
           INDEX_QUERIES_RESOURCE
       );
       doReindexTest(
+          INDEX_DATASOURCE,
           REINDEX_DATASOURCE,
           REINDEX_TASK,
           INDEX_QUERIES_RESOURCE
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTest.java
index 3306da9..247fd7e 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTest.java
@@ -44,6 +44,7 @@ import org.joda.time.DateTimeZone;
 import org.joda.time.format.DateTimeFormat;
 import org.joda.time.format.DateTimeFormatter;
 import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeSuite;
 import org.testng.annotations.Guice;
 import org.testng.annotations.Test;
 
@@ -64,6 +65,7 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest
   private static final String QUERIES_FILE = "/indexer/kafka_index_queries.json";
   private static final String DATASOURCE = "kafka_indexing_service_test";
   private static final String TOPIC_NAME = "kafka_indexing_service_topic";
+
   private static final int NUM_EVENTS_TO_SEND = 60;
   private static final long WAIT_TIME_MILLIS = 2 * 60 * 1000L;
   public static final String testPropertyPrefix = "kafka.test.property.";
@@ -105,6 +107,14 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest
   @Inject
   private IntegrationTestingConfig config;
 
+  private String fullDatasourceName;
+
+  @BeforeSuite
+  public void setFullDatasourceName()
+  {
+    fullDatasourceName = DATASOURCE + config.getExtraDatasourceNameSuffix();
+  }
+
   @Test
   public void testKafka()
   {
@@ -143,7 +153,7 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest
       addFilteredProperties(consumerProperties);
 
       spec = getTaskAsString(INDEXER_FILE);
-      spec = StringUtils.replace(spec, "%%DATASOURCE%%", DATASOURCE);
+      spec = StringUtils.replace(spec, "%%DATASOURCE%%", fullDatasourceName);
       spec = StringUtils.replace(spec, "%%TOPIC%%", TOPIC_NAME);
       spec = StringUtils.replace(spec, "%%CONSUMER_PROPERTIES%%", jsonMapper.writeValueAsString(consumerProperties));
       LOG.info("supervisorSpec: [%s]\n", spec);
@@ -228,7 +238,7 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest
     }
 
     String queryStr = query_response_template;
-    queryStr = StringUtils.replace(queryStr, "%%DATASOURCE%%", DATASOURCE);
+    queryStr = StringUtils.replace(queryStr, "%%DATASOURCE%%", fullDatasourceName);
     queryStr = StringUtils.replace(queryStr, "%%TIMEBOUNDARY_RESPONSE_TIMESTAMP%%", TIMESTAMP_FMT.print(dtFirst));
     queryStr = StringUtils.replace(queryStr, "%%TIMEBOUNDARY_RESPONSE_MAXTIME%%", TIMESTAMP_FMT.print(dtLast));
     queryStr = StringUtils.replace(queryStr, "%%TIMEBOUNDARY_RESPONSE_MINTIME%%", TIMESTAMP_FMT.print(dtFirst));
@@ -271,7 +281,7 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest
             @Override
             public Boolean call()
             {
-              return coordinator.areSegmentsLoaded(DATASOURCE);
+              return coordinator.areSegmentsLoaded(fullDatasourceName);
             }
           },
           true,
@@ -306,7 +316,7 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest
 
     // remove segments
     if (segmentsExist) {
-      unloadAndKillData(DATASOURCE);
+      unloadAndKillData(fullDatasourceName);
     }
   }
 
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaTest.java
index 3388efe..af7e829 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaTest.java
@@ -44,6 +44,7 @@ import org.joda.time.DateTimeZone;
 import org.joda.time.format.DateTimeFormat;
 import org.joda.time.format.DateTimeFormatter;
 import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeSuite;
 import org.testng.annotations.Guice;
 import org.testng.annotations.Test;
 
@@ -106,6 +107,13 @@ public class ITKafkaTest extends AbstractIndexerTest
   @Inject
   private IntegrationTestingConfig config;
 
+  private String fullDatasourceName;
+
+  @BeforeSuite
+  public void setFullDatasourceName()
+  {
+    fullDatasourceName = DATASOURCE + config.getExtraDatasourceNameSuffix();
+  }
   @Test
   public void testKafka()
   {
@@ -204,7 +212,7 @@ public class ITKafkaTest extends AbstractIndexerTest
       addFilteredProperties(consumerProperties);
 
       indexerSpec = getTaskAsString(INDEXER_FILE);
-      indexerSpec = StringUtils.replace(indexerSpec, "%%DATASOURCE%%", DATASOURCE);
+      indexerSpec = StringUtils.replace(indexerSpec, "%%DATASOURCE%%", fullDatasourceName);
       indexerSpec = StringUtils.replace(indexerSpec, "%%TOPIC%%", TOPIC_NAME);
       indexerSpec = StringUtils.replace(indexerSpec, "%%COUNT%%", Integer.toString(num_events));
       String consumerPropertiesJson = jsonMapper.writeValueAsString(consumerProperties);
@@ -233,7 +241,7 @@ public class ITKafkaTest extends AbstractIndexerTest
             @Override
             public Boolean call()
             {
-              return coordinator.areSegmentsLoaded(DATASOURCE);
+              return coordinator.areSegmentsLoaded(fullDatasourceName);
             }
           },
           true,
@@ -263,7 +271,7 @@ public class ITKafkaTest extends AbstractIndexerTest
     }
 
     String queryStr = queryResponseTemplate;
-    queryStr = StringUtils.replace(queryStr, "%%DATASOURCE%%", DATASOURCE);
+    queryStr = StringUtils.replace(queryStr, "%%DATASOURCE%%", fullDatasourceName);
     // time boundary
     queryStr = StringUtils.replace(queryStr, "%%TIMEBOUNDARY_RESPONSE_TIMESTAMP%%", TIMESTAMP_FMT.print(dtFirst));
     queryStr = StringUtils.replace(queryStr, "%%TIMEBOUNDARY_RESPONSE_MAXTIME%%", TIMESTAMP_FMT.print(dtLast));
@@ -296,7 +304,7 @@ public class ITKafkaTest extends AbstractIndexerTest
 
     // remove segments
     if (segmentsExist) {
-      unloadAndKillData(DATASOURCE);
+      unloadAndKillData(fullDatasourceName);
     }
   }
 
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITNestedQueryPushDownTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITNestedQueryPushDownTest.java
index 90ba304..350e2ab 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITNestedQueryPushDownTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITNestedQueryPushDownTest.java
@@ -21,6 +21,9 @@ package org.apache.druid.tests.indexer;
 
 import com.google.common.base.Throwables;
 import com.google.inject.Inject;
+import org.apache.commons.io.IOUtils;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.testing.IntegrationTestingConfig;
 import org.apache.druid.testing.clients.ClientInfoResourceTestClient;
@@ -28,9 +31,13 @@ import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
 import org.apache.druid.testing.guice.DruidTestModuleFactory;
 import org.apache.druid.testing.utils.RetryUtil;
 import org.apache.druid.testing.utils.TestQueryHelper;
+import org.testng.annotations.BeforeSuite;
 import org.testng.annotations.Guice;
 import org.testng.annotations.Test;
 
+import java.io.IOException;
+import java.io.InputStream;
+
 @Guice(moduleFactory = DruidTestModuleFactory.class)
 public class ITNestedQueryPushDownTest extends AbstractIndexerTest
 {
@@ -51,12 +58,36 @@ public class ITNestedQueryPushDownTest extends AbstractIndexerTest
   @Inject
   ClientInfoResourceTestClient clientInfoResourceTestClient;
 
+  private String fullDatasourceName;
+
+  @BeforeSuite
+  public void setFullDatasourceName()
+  {
+    fullDatasourceName = WIKITICKER_DATA_SOURCE + config.getExtraDatasourceNameSuffix();
+  }
+
   @Test
   public void testIndexData()
   {
     try {
       loadData();
-      queryHelper.testQueriesFromFile(WIKITICKER_QUERIES_RESOURCE, 2);
+
+      String queryResponseTemplate;
+      try {
+        InputStream is = AbstractITBatchIndexTest.class.getResourceAsStream(WIKITICKER_QUERIES_RESOURCE);
+        queryResponseTemplate = IOUtils.toString(is, "UTF-8");
+      }
+      catch (IOException e) {
+        throw new ISE(e, "could not read query file: %s", WIKITICKER_QUERIES_RESOURCE);
+      }
+
+      queryResponseTemplate = StringUtils.replace(
+          queryResponseTemplate,
+          "%%DATASOURCE%%",
+          fullDatasourceName
+      );
+
+      queryHelper.testQueriesFromString(queryResponseTemplate, 2);
     }
     catch (Exception e) {
       LOG.error(e, "Error while testing");
@@ -66,11 +97,13 @@ public class ITNestedQueryPushDownTest extends AbstractIndexerTest
 
   private void loadData() throws Exception
   {
-    final String taskID = indexer.submitTask(getTaskAsString(WIKITICKER_INDEX_TASK));
+    String taskSpec = getTaskAsString(WIKITICKER_INDEX_TASK);
+    taskSpec = StringUtils.replace(taskSpec, "%%DATASOURCE%%", fullDatasourceName);
+    final String taskID = indexer.submitTask(taskSpec);
     LOG.info("TaskID for loading index task %s", taskID);
     indexer.waitUntilTaskCompletes(taskID);
     RetryUtil.retryUntilTrue(
-        () -> coordinator.areSegmentsLoaded(WIKITICKER_DATA_SOURCE), "Segment Load"
+        () -> coordinator.areSegmentsLoaded(fullDatasourceName), "Segment Load"
     );
   }
 }
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITParallelIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITParallelIndexTest.java
index b844acd..80ca6e1 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITParallelIndexTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITParallelIndexTest.java
@@ -35,7 +35,7 @@ public class ITParallelIndexTest extends AbstractITBatchIndexTest
   @Test
   public void testIndexData() throws Exception
   {
-    try (final Closeable closeable = unloader(INDEX_DATASOURCE)) {
+    try (final Closeable closeable = unloader(INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix())) {
       doIndexTestTest(
           INDEX_DATASOURCE,
           INDEX_TASK,
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITUnionQueryTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITUnionQueryTest.java
index 38adae8..8b65c40 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITUnionQueryTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITUnionQueryTest.java
@@ -20,9 +20,11 @@
 package org.apache.druid.tests.indexer;
 
 import com.google.inject.Inject;
+import org.apache.commons.io.IOUtils;
 import org.apache.druid.curator.discovery.ServerDiscoveryFactory;
 import org.apache.druid.curator.discovery.ServerDiscoverySelector;
 import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.common.logger.Logger;
@@ -39,10 +41,12 @@ import org.apache.druid.testing.utils.ServerDiscoveryUtil;
 import org.jboss.netty.handler.codec.http.HttpMethod;
 import org.jboss.netty.handler.codec.http.HttpResponseStatus;
 import org.joda.time.DateTime;
+import org.testng.annotations.BeforeSuite;
 import org.testng.annotations.Guice;
 import org.testng.annotations.Test;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.net.URL;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
@@ -70,13 +74,21 @@ public class ITUnionQueryTest extends AbstractIndexerTest
   @Inject
   IntegrationTestingConfig config;
 
+  private String fullDatasourceName;
+
+  @BeforeSuite
+  public void setFullDatasourceName()
+  {
+    fullDatasourceName = UNION_DATASOURCE + config.getExtraDatasourceNameSuffix();
+  }
+
   @Test
   public void testUnionQuery() throws IOException
   {
     final int numTasks = 3;
     final Closer closer = Closer.create();
     for (int i = 0; i < numTasks; i++) {
-      closer.register(unloader(UNION_DATASOURCE + i));
+      closer.register(unloader(fullDatasourceName + i));
     }
     try {
       // Load 4 datasources with same dimensions
@@ -89,7 +101,7 @@ public class ITUnionQueryTest extends AbstractIndexerTest
         taskIDs.add(
             indexer.submitTask(
                 withServiceName(
-                    withDataSource(task, UNION_DATASOURCE + i),
+                    withDataSource(task, fullDatasourceName + i),
                     EVENT_RECEIVER_SERVICE_PREFIX + i
                 )
             )
@@ -103,9 +115,9 @@ public class ITUnionQueryTest extends AbstractIndexerTest
       RetryUtil.retryUntil(
           () -> {
             for (int i = 0; i < numTasks; i++) {
-              final int countRows = queryHelper.countRows(UNION_DATASOURCE + i, "2013-08-31/2013-09-01");
+              final int countRows = queryHelper.countRows(fullDatasourceName + i, "2013-08-31/2013-09-01");
               if (countRows < 5) {
-                LOG.warn("%d events have been ingested to %s so far", countRows, UNION_DATASOURCE + i);
+                LOG.warn("%d events have been ingested to %s so far", countRows, fullDatasourceName + i);
                 return false;
               }
             }
@@ -119,7 +131,23 @@ public class ITUnionQueryTest extends AbstractIndexerTest
 
       // should hit the queries on realtime task
       LOG.info("Running Union Queries..");
-      this.queryHelper.testQueriesFromFile(UNION_QUERIES_RESOURCE, 2);
+
+      String queryResponseTemplate;
+      try {
+        InputStream is = AbstractITBatchIndexTest.class.getResourceAsStream(UNION_QUERIES_RESOURCE);
+        queryResponseTemplate = IOUtils.toString(is, "UTF-8");
+      }
+      catch (IOException e) {
+        throw new ISE(e, "could not read query file: %s", UNION_QUERIES_RESOURCE);
+      }
+
+      queryResponseTemplate = StringUtils.replace(
+          queryResponseTemplate,
+          "%%DATASOURCE%%",
+          fullDatasourceName
+      );
+
+      this.queryHelper.testQueriesFromString(queryResponseTemplate, 2);
 
       // wait for the task to complete
       for (int i = 0; i < numTasks; i++) {
@@ -134,7 +162,7 @@ public class ITUnionQueryTest extends AbstractIndexerTest
               @Override
               public Boolean call()
               {
-                return coordinator.areSegmentsLoaded(UNION_DATASOURCE + taskNum);
+                return coordinator.areSegmentsLoaded(fullDatasourceName + taskNum);
               }
             },
             true,
@@ -144,7 +172,7 @@ public class ITUnionQueryTest extends AbstractIndexerTest
         );
       }
       // run queries on historical nodes
-      this.queryHelper.testQueriesFromFile(UNION_QUERIES_RESOURCE, 2);
+      this.queryHelper.testQueriesFromString(queryResponseTemplate, 2);
 
     }
     catch (Throwable e) {
@@ -162,7 +190,7 @@ public class ITUnionQueryTest extends AbstractIndexerTest
 
   private String withDataSource(String taskAsString, String dataSource)
   {
-    return StringUtils.replace(taskAsString, UNION_DATASOURCE, dataSource);
+    return StringUtils.replace(taskAsString, "%%DATASOURCE%%", dataSource);
   }
 
   private String withServiceName(String taskAsString, String serviceName)
diff --git a/integration-tests/src/test/resources/indexer/union_queries.json b/integration-tests/src/test/resources/indexer/union_queries.json
index fa63e84..627af04 100644
--- a/integration-tests/src/test/resources/indexer/union_queries.json
+++ b/integration-tests/src/test/resources/indexer/union_queries.json
@@ -6,8 +6,8 @@
             "dataSource": {
                 "type": "union",
                 "dataSources": [
-                    "wikipedia_index_test1", "wikipedia_index_test2", "wikipedia_index_test3",
-                    "wikipedia_index_test0"
+                    "%%DATASOURCE%%1", "%%DATASOURCE%%2", "%%DATASOURCE%%3",
+                    "%%DATASOURCE%%0"
                 ]
             },
             "intervals": ["2013-08-31/2013-09-01"],
@@ -69,8 +69,8 @@
             "dataSource": {
                 "type": "union",
                 "dataSources": [
-                    "wikipedia_index_test1", "wikipedia_index_test2", "wikipedia_index_test3",
-                    "wikipedia_index_test0"
+                    "%%DATASOURCE%%1", "%%DATASOURCE%%2", "%%DATASOURCE%%3",
+                    "%%DATASOURCE%%0"
                 ]
             },
             "intervals": ["2013-08-31/2013-09-01"],
@@ -149,8 +149,8 @@
             "dataSource": {
                 "type": "union",
                 "dataSources": [
-                    "wikipedia_index_test1", "wikipedia_index_test2", "wikipedia_index_test3",
-                    "wikipedia_index_test0"
+                    "%%DATASOURCE%%1", "%%DATASOURCE%%2", "%%DATASOURCE%%3",
+                    "%%DATASOURCE%%0"
                 ]
             },
             "intervals": ["2013-08-31/2013-09-01"],
@@ -263,8 +263,8 @@
             "dataSource": {
                 "type": "union",
                 "dataSources": [
-                    "wikipedia_index_test1", "wikipedia_index_test2", "wikipedia_index_test3",
-                    "wikipedia_index_test0"
+                    "%%DATASOURCE%%1", "%%DATASOURCE%%2", "%%DATASOURCE%%3",
+                    "%%DATASOURCE%%0"
                 ]
             },
             "intervals": ["2013-08-31/2013-09-01"],
@@ -344,8 +344,8 @@
             "dataSource": {
                 "type": "union",
                 "dataSources": [
-                    "wikipedia_index_test1", "wikipedia_index_test2", "wikipedia_index_test3",
-                    "wikipedia_index_test0"
+                    "%%DATASOURCE%%1", "%%DATASOURCE%%2", "%%DATASOURCE%%3",
+                    "%%DATASOURCE%%0"
                 ]
             },
             "intervals": ["2013-08-31/2013-09-01"],
@@ -417,8 +417,8 @@
             "dataSource": {
                 "type": "union",
                 "dataSources": [
-                    "wikipedia_index_test1", "wikipedia_index_test2", "wikipedia_index_test3",
-                    "wikipedia_index_test0"
+                    "%%DATASOURCE%%1", "%%DATASOURCE%%2", "%%DATASOURCE%%3",
+                    "%%DATASOURCE%%0"
                 ]
             },
             "intervals": ["2013-08-31/2013-09-01"],
@@ -508,8 +508,8 @@
             "dataSource": {
                 "type": "union",
                 "dataSources": [
-                    "wikipedia_index_test1", "wikipedia_index_test2", "wikipedia_index_test3",
-                    "wikipedia_index_test0"
+                    "%%DATASOURCE%%1", "%%DATASOURCE%%2", "%%DATASOURCE%%3",
+                    "%%DATASOURCE%%0"
                 ]
             },
             "granularity": "all",
@@ -548,8 +548,8 @@
             "dataSource": {
                 "type": "union",
                 "dataSources": [
-                    "wikipedia_index_test1", "wikipedia_index_test2", "wikipedia_index_test3",
-                    "wikipedia_index_test0"
+                    "%%DATASOURCE%%1", "%%DATASOURCE%%2", "%%DATASOURCE%%3",
+                    "%%DATASOURCE%%0"
                 ]
             }
         },
diff --git a/integration-tests/src/test/resources/indexer/wikipedia_compaction_task.json b/integration-tests/src/test/resources/indexer/wikipedia_compaction_task.json
index 3fdad69..1f7cb44 100644
--- a/integration-tests/src/test/resources/indexer/wikipedia_compaction_task.json
+++ b/integration-tests/src/test/resources/indexer/wikipedia_compaction_task.json
@@ -1,6 +1,6 @@
 {
   "type" : "compact",
-  "dataSource" : "wikipedia_index_test",
+  "dataSource" : "%%DATASOURCE%%",
   "interval" : "2013-08-31/2013-09-02",
   "keepSegmentGranularity" : ${KEEP_SEGMENT_GRANULARITY}
 }
\ No newline at end of file
diff --git a/integration-tests/src/test/resources/indexer/wikipedia_index_queries.json b/integration-tests/src/test/resources/indexer/wikipedia_index_queries.json
index 04565bd..9618ba9 100644
--- a/integration-tests/src/test/resources/indexer/wikipedia_index_queries.json
+++ b/integration-tests/src/test/resources/indexer/wikipedia_index_queries.json
@@ -3,7 +3,7 @@
         "description": "timeseries, 1 agg, all",
         "query":{
             "queryType" : "timeBoundary",
-            "dataSource": "wikipedia_index_test"
+            "dataSource": "%%DATASOURCE%%"
         },
         "expectedResults":[
             {
@@ -20,7 +20,7 @@
         "description":"having spec on post aggregation",
         "query":{
             "queryType":"groupBy",
-            "dataSource":"wikipedia_index_test",
+            "dataSource":"%%DATASOURCE%%",
             "granularity":"day",
             "dimensions":[
                 "page"
diff --git a/integration-tests/src/test/resources/indexer/wikipedia_index_task.json b/integration-tests/src/test/resources/indexer/wikipedia_index_task.json
index 819caae..23532e5 100644
--- a/integration-tests/src/test/resources/indexer/wikipedia_index_task.json
+++ b/integration-tests/src/test/resources/indexer/wikipedia_index_task.json
@@ -2,7 +2,7 @@
     "type": "index",
     "spec": {
         "dataSchema": {
-            "dataSource": "wikipedia_index_test",
+            "dataSource": "%%DATASOURCE%%",
             "metricsSpec": [
                 {
                     "type": "count",
diff --git a/integration-tests/src/test/resources/indexer/wikipedia_parallel_index_queries.json b/integration-tests/src/test/resources/indexer/wikipedia_parallel_index_queries.json
index 76ecb5c..9618ba9 100644
--- a/integration-tests/src/test/resources/indexer/wikipedia_parallel_index_queries.json
+++ b/integration-tests/src/test/resources/indexer/wikipedia_parallel_index_queries.json
@@ -3,7 +3,7 @@
         "description": "timeseries, 1 agg, all",
         "query":{
             "queryType" : "timeBoundary",
-            "dataSource": "wikipedia_parallel_index_test"
+            "dataSource": "%%DATASOURCE%%"
         },
         "expectedResults":[
             {
@@ -20,7 +20,7 @@
         "description":"having spec on post aggregation",
         "query":{
             "queryType":"groupBy",
-            "dataSource":"wikipedia_parallel_index_test",
+            "dataSource":"%%DATASOURCE%%",
             "granularity":"day",
             "dimensions":[
                 "page"
diff --git a/integration-tests/src/test/resources/indexer/wikipedia_parallel_index_task.json b/integration-tests/src/test/resources/indexer/wikipedia_parallel_index_task.json
index 911adbd..f317c53 100644
--- a/integration-tests/src/test/resources/indexer/wikipedia_parallel_index_task.json
+++ b/integration-tests/src/test/resources/indexer/wikipedia_parallel_index_task.json
@@ -2,7 +2,7 @@
     "type": "index_parallel",
     "spec": {
         "dataSchema": {
-            "dataSource": "wikipedia_parallel_index_test",
+            "dataSource": "%%DATASOURCE%%",
             "metricsSpec": [
                 {
                     "type": "count",
diff --git a/integration-tests/src/test/resources/indexer/wikipedia_realtime_appenderator_index_queries.json b/integration-tests/src/test/resources/indexer/wikipedia_realtime_appenderator_index_queries.json
index acd88ca..46d5ec4 100644
--- a/integration-tests/src/test/resources/indexer/wikipedia_realtime_appenderator_index_queries.json
+++ b/integration-tests/src/test/resources/indexer/wikipedia_realtime_appenderator_index_queries.json
@@ -3,7 +3,7 @@
    "description": "timeBoundary",
    "query": {
       "queryType":"timeBoundary",
-      "dataSource":"wikipedia_index_test"
+      "dataSource":"%%DATASOURCE%%"
    },
    "expectedResults":[
       {
@@ -19,7 +19,7 @@
    "description": "timeseries",
    "query": {
       "queryType": "timeseries",
-      "dataSource": "wikipedia_index_test",
+      "dataSource": "%%DATASOURCE%%",
       "intervals": [ "%%TIMESERIES_QUERY_START%%/%%TIMESERIES_QUERY_END%%" ],
       "granularity": "all",
       "aggregations": [
@@ -41,7 +41,7 @@
      "description":"having spec on post aggregation",
      "query":{ 
      "queryType":"groupBy",
-     "dataSource":"wikipedia_index_test",
+     "dataSource":"%%DATASOURCE%%",
      "granularity":"minute",
      "dimensions":[
           "page"
diff --git a/integration-tests/src/test/resources/indexer/wikipedia_realtime_appenderator_index_task.json b/integration-tests/src/test/resources/indexer/wikipedia_realtime_appenderator_index_task.json
index 765914b..9e77360 100644
--- a/integration-tests/src/test/resources/indexer/wikipedia_realtime_appenderator_index_task.json
+++ b/integration-tests/src/test/resources/indexer/wikipedia_realtime_appenderator_index_task.json
@@ -2,7 +2,7 @@
   "type": "index_realtime_appenderator",
   "spec": {
     "dataSchema": {
-      "dataSource": "wikipedia_index_test",
+      "dataSource": "%%DATASOURCE%%",
       "metricsSpec": [
         {
           "type": "count",
diff --git a/integration-tests/src/test/resources/indexer/wikipedia_realtime_index_queries.json b/integration-tests/src/test/resources/indexer/wikipedia_realtime_index_queries.json
index b579b34..bb67595 100644
--- a/integration-tests/src/test/resources/indexer/wikipedia_realtime_index_queries.json
+++ b/integration-tests/src/test/resources/indexer/wikipedia_realtime_index_queries.json
@@ -3,7 +3,7 @@
    "description": "timeBoundary",
    "query": {
       "queryType":"timeBoundary",
-      "dataSource":"wikipedia_index_test"
+      "dataSource":"%%DATASOURCE%%"
    },
    "expectedResults":[
       {
@@ -19,7 +19,7 @@
    "description": "timeseries",
    "query": {
       "queryType": "timeseries",
-      "dataSource": "wikipedia_index_test",
+      "dataSource": "%%DATASOURCE%%",
       "intervals": [ "%%TIMESERIES_QUERY_START%%/%%TIMESERIES_QUERY_END%%" ],
       "granularity": "all",
       "aggregations": [
@@ -41,7 +41,7 @@
      "description":"having spec on post aggregation",
      "query":{ 
      "queryType":"groupBy",
-     "dataSource":"wikipedia_index_test",
+     "dataSource":"%%DATASOURCE%%",
      "granularity":"minute",
      "dimensions":[
           "page"
diff --git a/integration-tests/src/test/resources/indexer/wikipedia_realtime_index_task.json b/integration-tests/src/test/resources/indexer/wikipedia_realtime_index_task.json
index ecfff57..5f48162 100644
--- a/integration-tests/src/test/resources/indexer/wikipedia_realtime_index_task.json
+++ b/integration-tests/src/test/resources/indexer/wikipedia_realtime_index_task.json
@@ -2,7 +2,7 @@
   "type": "index_realtime",
   "spec": {
     "dataSchema": {
-      "dataSource": "wikipedia_index_test",
+      "dataSource": "%%DATASOURCE%%",
       "metricsSpec": [
         {
           "type": "count",
diff --git a/integration-tests/src/test/resources/indexer/wikipedia_reindex_task.json b/integration-tests/src/test/resources/indexer/wikipedia_reindex_task.json
index b63f9f1..e277a91 100644
--- a/integration-tests/src/test/resources/indexer/wikipedia_reindex_task.json
+++ b/integration-tests/src/test/resources/indexer/wikipedia_reindex_task.json
@@ -2,7 +2,7 @@
     "type": "index",
     "spec": {
         "dataSchema": {
-            "dataSource": "wikipedia_reindex_test",
+            "dataSource": "%%REINDEX_DATASOURCE%%",
             "metricsSpec": [
                 {
                     "type": "doubleSum",
@@ -42,7 +42,7 @@
             "type": "index",
             "firehose": {
                 "type": "ingestSegment",
-                "dataSource": "wikipedia_index_test",
+                "dataSource": "%%DATASOURCE%%",
                 "interval": "2013-08-31/2013-09-01"
             }
         },
diff --git a/integration-tests/src/test/resources/indexer/wikipedia_union_index_task.json b/integration-tests/src/test/resources/indexer/wikipedia_union_index_task.json
index 09af36e..75c1281 100644
--- a/integration-tests/src/test/resources/indexer/wikipedia_union_index_task.json
+++ b/integration-tests/src/test/resources/indexer/wikipedia_union_index_task.json
@@ -2,7 +2,7 @@
   "type": "index_realtime",
   "spec": {
     "dataSchema": {
-      "dataSource": "wikipedia_index_test",
+      "dataSource": "%%DATASOURCE%%",
       "metricsSpec": [
         {
           "type": "count",
diff --git a/integration-tests/src/test/resources/indexer/wikiticker_index_task.json b/integration-tests/src/test/resources/indexer/wikiticker_index_task.json
index 5af8ae8..d450c7b 100644
--- a/integration-tests/src/test/resources/indexer/wikiticker_index_task.json
+++ b/integration-tests/src/test/resources/indexer/wikiticker_index_task.json
@@ -2,7 +2,7 @@
   "type": "index",
   "spec": {
     "dataSchema": {
-      "dataSource": "wikiticker",
+      "dataSource": "%%DATASOURCE%%",
       "granularitySpec": {
         "type": "uniform",
         "segmentGranularity": "day",
@@ -18,20 +18,7 @@
           "dimensionsSpec": {
             "dimensions": [
               "channel",
-              "cityName",
-              "comment",
-              "countryIsoCode",
-              "countryName",
-              "isAnonymous",
-              "isMinor",
-              "isNew",
-              "isRobot",
-              "isUnpatrolled",
-              "metroCode",
-              "namespace",
               "page",
-              "regionIsoCode",
-              "regionName",
               "user"
             ]
           },
@@ -60,11 +47,6 @@
           "name": "delta",
           "type": "longSum",
           "fieldName": "delta"
-        },
-        {
-          "name": "user_unique",
-          "type": "hyperUnique",
-          "fieldName": "user"
         }
       ]
     },
@@ -72,7 +54,7 @@
       "type": "index",
       "firehose": {
         "type": "local",
-        "baseDir": "/examples/quickstart/tutorial",
+        "baseDir": "/shared/wikiticker-it",
         "filter": "wikiticker-2015-09-12-sampled.json.gz"
       }
     },
diff --git a/integration-tests/src/test/resources/queries/nestedquerypushdown_queries.json b/integration-tests/src/test/resources/queries/nestedquerypushdown_queries.json
index 4c0350c..c7a062c 100644
--- a/integration-tests/src/test/resources/queries/nestedquerypushdown_queries.json
+++ b/integration-tests/src/test/resources/queries/nestedquerypushdown_queries.json
@@ -7,7 +7,7 @@
         "type": "query",
         "query": {
           "queryType": "groupBy",
-          "dataSource": "wikiticker",
+          "dataSource": "%%DATASOURCE%%",
           "intervals": [
             "2015-09-12/2015-09-13"
           ],
@@ -60,7 +60,7 @@
         "type": "query",
         "query": {
           "queryType": "groupBy",
-          "dataSource": "wikiticker",
+          "dataSource": "%%DATASOURCE%%",
           "intervals": [
             "2015-09-12/2015-09-13"
           ],
@@ -113,7 +113,7 @@
         "type": "query",
         "query": {
           "queryType": "groupBy",
-          "dataSource": "wikiticker",
+          "dataSource": "%%DATASOURCE%%",
           "intervals": [
             "2015-09-12/2015-09-13"
           ],
@@ -191,7 +191,7 @@
         "type": "query",
         "query": {
           "queryType": "groupBy",
-          "dataSource": "wikiticker",
+          "dataSource": "%%DATASOURCE%%",
           "intervals": [
             "2015-09-12/2015-09-13"
           ],
@@ -253,7 +253,7 @@
         "type": "query",
         "query": {
           "queryType": "groupBy",
-          "dataSource": "wikiticker",
+          "dataSource": "%%DATASOURCE%%",
           "intervals": [
             "2015-09-12/2015-09-13"
           ],
diff --git a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java
index ef731b3..631c808 100644
--- a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java
+++ b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java
@@ -54,7 +54,7 @@ public class CoordinatorClient
               HttpMethod.GET,
               StringUtils.format(
                   "/druid/coordinator/v1/datasources/%s/handoffComplete?interval=%s&partitionNumber=%d&version=%s",
-                  dataSource,
+                  StringUtils.urlEncode(dataSource),
                   descriptor.getInterval(),
                   descriptor.getPartitionNumber(),
                   descriptor.getVersion()
diff --git a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
index d31ef1a..702d78b 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
@@ -154,7 +154,10 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
       final FullResponseHolder response = druidLeaderClient.go(
           druidLeaderClient.makeRequest(
               HttpMethod.POST,
-              StringUtils.format("/druid/indexer/v1/task/%s/shutdown", taskId)
+              StringUtils.format(
+                  "/druid/indexer/v1/task/%s/shutdown",
+                  StringUtils.urlEncode(taskId)
+              )
           )
       );
 
@@ -255,7 +258,10 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
   {
     try {
       final FullResponseHolder responseHolder = druidLeaderClient.go(
-          druidLeaderClient.makeRequest(HttpMethod.GET, StringUtils.format("/druid/indexer/v1/task/%s/status", taskId))
+          druidLeaderClient.makeRequest(HttpMethod.GET, StringUtils.format(
+              "/druid/indexer/v1/task/%s/status",
+              StringUtils.urlEncode(taskId)
+          ))
       );
 
       return jsonMapper.readValue(
@@ -303,7 +309,7 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
   {
     final String endPoint = StringUtils.format(
         "/druid/indexer/v1/pendingSegments/%s?interval=%s",
-        dataSource,
+        StringUtils.urlEncode(dataSource),
         new Interval(DateTimes.MIN, end)
     );
     try {
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/ChatHandlerResource.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/ChatHandlerResource.java
index 693b302..9e64731 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/firehose/ChatHandlerResource.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/ChatHandlerResource.java
@@ -21,6 +21,7 @@ package org.apache.druid.segment.realtime.firehose;
 
 import com.google.common.base.Optional;
 import com.google.inject.Inject;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.server.metrics.DataSourceTaskIdHolder;
 
 import javax.ws.rs.Path;
@@ -49,7 +50,7 @@ public class ChatHandlerResource
   {
     if (taskId != null) {
       List<String> requestTaskId = headers.getRequestHeader(TASK_ID_HEADER);
-      if (requestTaskId != null && !requestTaskId.contains(taskId)) {
+      if (requestTaskId != null && !requestTaskId.contains(StringUtils.urlEncode(taskId))) {
         return null;
       }
     }
diff --git a/server/src/main/java/org/apache/druid/server/initialization/jetty/TaskIdResponseHeaderFilterHolder.java b/server/src/main/java/org/apache/druid/server/initialization/jetty/TaskIdResponseHeaderFilterHolder.java
index 6302a39..7dcd3b2 100644
--- a/server/src/main/java/org/apache/druid/server/initialization/jetty/TaskIdResponseHeaderFilterHolder.java
+++ b/server/src/main/java/org/apache/druid/server/initialization/jetty/TaskIdResponseHeaderFilterHolder.java
@@ -20,6 +20,7 @@
 package org.apache.druid.server.initialization.jetty;
 
 import com.google.common.collect.ImmutableMap;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.segment.realtime.firehose.ChatHandlerResource;
 
 public class TaskIdResponseHeaderFilterHolder extends ResponseHeaderFilterHolder
@@ -29,7 +30,7 @@ public class TaskIdResponseHeaderFilterHolder extends ResponseHeaderFilterHolder
     super(path,
           taskId == null
           ? ImmutableMap.of()
-          : ImmutableMap.of(ChatHandlerResource.TASK_ID_HEADER, taskId)
+          : ImmutableMap.of(ChatHandlerResource.TASK_ID_HEADER, StringUtils.urlEncode(taskId))
     );
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org