You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by im...@apache.org on 2017/11/30 04:55:54 UTC

[09/14] asterixdb git commit: [NO ISSUE] Delete asterix-experiments

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0e21afa7/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/LSMExperimentSetRunner.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/LSMExperimentSetRunner.java b/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/LSMExperimentSetRunner.java
deleted file mode 100644
index 45073d9..0000000
--- a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/LSMExperimentSetRunner.java
+++ /dev/null
@@ -1,306 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.experiment.client;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-import java.util.regex.Pattern;
-
-import org.apache.asterix.experiment.action.base.SequentialActionList;
-import org.apache.asterix.experiment.builder.AbstractExperimentBuilder;
-import org.apache.asterix.experiment.builder.PerfTestAggBuilder;
-import org.apache.asterix.experiment.builder.PresetClusterPerfBuilder;
-import org.kohsuke.args4j.CmdLineException;
-import org.kohsuke.args4j.CmdLineParser;
-import org.kohsuke.args4j.Option;
-
-public class LSMExperimentSetRunner {
-
-    private static final Logger LOGGER = Logger.getLogger(LSMExperimentSetRunner.class.getName());
-
-    public static class LSMExperimentSetRunnerConfig {
-
-        private final String logDirSuffix;
-
-        private final int nQueryRuns;
-
-        public LSMExperimentSetRunnerConfig(String logDirSuffix, int nQueryRuns) {
-            this.logDirSuffix = logDirSuffix;
-            this.nQueryRuns = nQueryRuns;
-        }
-
-        public String getLogDirSuffix() {
-            return logDirSuffix;
-        }
-
-        public int getNQueryRuns() {
-            return nQueryRuns;
-        }
-
-        @Option(name = "-rh", aliases = "--rest-host", usage = "Asterix REST API host address", required = true, metaVar = "HOST")
-        private String restHost;
-
-        public String getRESTHost() {
-            return restHost;
-        }
-
-        @Option(name = "-rp", aliases = "--rest-port", usage = "Asterix REST API port", required = true, metaVar = "PORT")
-        private int restPort;
-
-        public int getRESTPort() {
-            return restPort;
-        }
-
-        @Option(name = "-mh", aliases = "--managix-home", usage = "Path to MANAGIX_HOME directory", required = true, metaVar = "MGXHOME")
-        private String managixHome;
-
-        public String getManagixHome() {
-            return managixHome;
-        }
-
-        @Option(name = "-jh", aliases = "--java-home", usage = "Path to JAVA_HOME directory", required = true, metaVar = "JAVAHOME")
-        private String javaHome;
-
-        public String getJavaHome() {
-            return javaHome;
-        }
-
-        @Option(name = "-ler", aliases = "--local-experiment-root", usage = "Path to the local LSM experiment root directory", required = true, metaVar = "LOCALEXPROOT")
-        private String localExperimentRoot;
-
-        public String getLocalExperimentRoot() {
-            return localExperimentRoot;
-        }
-
-        @Option(name = "-u", aliases = "--username", usage = "Username to use for SSH/SCP", required = true, metaVar = "UNAME")
-        private String username;
-
-        public String getUsername() {
-            return username;
-        }
-
-        @Option(name = "-k", aliases = "--key", usage = "SSH key location", metaVar = "SSHKEY")
-        private String sshKeyLocation;
-
-        public String getSSHKeyLocation() {
-            return sshKeyLocation;
-        }
-
-        @Option(name = "-d", aliases = "--datagen-duration", usage = "Data generation duration in seconds", metaVar = "DATAGENDURATION")
-        private int duration;
-
-        public int getDuration() {
-            return duration;
-        }
-
-        @Option(name = "-qd", aliases = "--querygen-duration", usage = "Query generation duration in seconds", metaVar = "QUERYGENDURATION")
-        private int queryDuration;
-
-        public int getQueryDuration() {
-            return queryDuration;
-        }
-
-        @Option(name = "-regex", aliases = "--regex", usage = "Regular expression used to match experiment names", metaVar = "REGEXP")
-        private String regex;
-
-        public String getRegex() {
-            return regex;
-        }
-
-        @Option(name = "-oh", aliases = "--orchestrator-host", usage = "The host address of THIS orchestrator")
-        private String orchHost;
-
-        public String getOrchestratorHost() {
-            return orchHost;
-        }
-
-        @Option(name = "-op", aliases = "--orchestrator-port", usage = "The port to be used for the orchestrator server of THIS orchestrator")
-        private int orchPort;
-
-        public int getOrchestratorPort() {
-            return orchPort;
-        }
-
-        @Option(name = "-qoh", aliases = "--query-orchestrator-host", usage = "The host address of query orchestrator")
-        private String queryOrchHost;
-
-        public String getQueryOrchestratorHost() {
-            return queryOrchHost;
-        }
-
-        @Option(name = "-qop", aliases = "--query-orchestrator-port", usage = "The port to be used for the orchestrator server of query orchestrator")
-        private int queryOrchPort;
-
-        public int getQueryOrchestratorPort() {
-            return queryOrchPort;
-        }
-
-        @Option(name = "-di", aliases = "--data-interval", usage = " Initial data interval to use when generating data for exp 7")
-        private long dataInterval;
-
-        public long getDataInterval() {
-            return dataInterval;
-        }
-
-        @Option(name = "-ni", aliases = "--num-data-intervals", usage = "Number of data intervals to use when generating data for exp 7")
-        private int numIntervals;
-
-        public int getNIntervals() {
-            return numIntervals;
-        }
-
-        @Option(name = "-sf", aliases = "--stat-file", usage = "Enable IO/CPU stats and place in specified file")
-        private String statFile = null;
-
-        public String getStatFile() {
-            return statFile;
-        }
-
-        @Option(name = "-of", aliases = "--openstreetmap-filepath", usage = "The open street map gps point data file path")
-        private String openStreetMapFilePath;
-
-        public String getOpenStreetMapFilePath() {
-            return openStreetMapFilePath;
-        }
-
-        @Option(name = "-si", aliases = "--location-sample-interval", usage = "Location sample interval from open street map point data")
-        private int locationSampleInterval;
-
-        public int getLocationSampleInterval() {
-            return locationSampleInterval;
-        }
-
-        @Option(name = "-qsf", aliases = "--query-seed-filepath", usage = "The query seed file path")
-        private String querySeedFilePath;
-
-        public String getQuerySeedFilePath() {
-            return querySeedFilePath;
-        }
-
-        @Option(name = "-rcbi", aliases = "--record-count-per-batch-during-ingestion-only", usage = "Record count per batch during ingestion only")
-        private int recordCountPerBatchDuringIngestionOnly = 1000;
-
-        public int getRecordCountPerBatchDuringIngestionOnly() {
-            return recordCountPerBatchDuringIngestionOnly;
-        }
-
-        @Option(name = "-rcbq", aliases = "--record-count-per-batch-during-query", usage = "Record count per batch during query")
-        private int recordCountPerBatchDuringQuery = 1000;
-
-        public int getRecordCountPerBatchDuringQuery() {
-            return recordCountPerBatchDuringQuery;
-        }
-
-        @Option(name = "-dsti", aliases = "--data-gen-sleep-time-during-ingestion-only", usage = "DataGen sleep time in milliseconds after every recordCountPerBatchDuringIngestionOnly records were sent")
-        private long dataGenSleepTimeDuringIngestionOnly = 1;
-
-        public long getDataGenSleepTimeDuringIngestionOnly() {
-            return dataGenSleepTimeDuringIngestionOnly;
-        }
-
-        @Option(name = "-dstq", aliases = "--data-gen-sleep-time-during-query", usage = "DataGen sleep time in milliseconds after every recordCountPerBatchDuringQuery records were sent")
-        private long dataGenSleepTimeDuringQuery = 1;
-
-        public long getDataGenSleepTimeDuringQuery() {
-            return dataGenSleepTimeDuringQuery;
-        }
-    }
-
-    public static void main(String[] args) throws Exception {
-        //        LogManager.getRootLogger().setLevel(org.apache.log4j.Level.OFF);
-        LSMExperimentSetRunnerConfig config = new LSMExperimentSetRunnerConfig(String.valueOf(System
-                .currentTimeMillis()), 3);
-        CmdLineParser clp = new CmdLineParser(config);
-        try {
-            clp.parseArgument(args);
-        } catch (CmdLineException e) {
-            System.err.println(e.getMessage());
-            clp.printUsage(System.err);
-            System.exit(1);
-        }
-
-        Collection<AbstractExperimentBuilder> suite = new ArrayList<>();
-
-        /*
-                suite.add(new Experiment7BBuilder(config));
-                suite.add(new Experiment7DBuilder(config));
-                suite.add(new Experiment7ABuilder(config));
-                suite.add(new Experiment8DBuilder(config));
-                suite.add(new Experiment8ABuilder(config));
-                suite.add(new Experiment8BBuilder(config));
-                suite.add(new Experiment9ABuilder(config));
-                suite.add(new Experiment9DBuilder(config));
-                suite.add(new Experiment9BBuilder(config));
-                suite.add(new Experiment6ABuilder(config));
-                suite.add(new Experiment6BBuilder(config));
-                suite.add(new Experiment6CBuilder(config));
-                suite.add(new Experiment2D1Builder(config));
-                suite.add(new Experiment2D2Builder(config));
-                suite.add(new Experiment2D4Builder(config));
-                suite.add(new Experiment2D8Builder(config));
-                suite.add(new Experiment2C1Builder(config));
-                suite.add(new Experiment2C2Builder(config));
-                suite.add(new Experiment2C4Builder(config));
-                suite.add(new Experiment2C8Builder(config));
-                suite.add(new Experiment2A1Builder(config));
-                suite.add(new Experiment2A2Builder(config));
-                suite.add(new Experiment2A4Builder(config));
-                suite.add(new Experiment2A8Builder(config));
-                suite.add(new Experiment2B1Builder(config));
-                suite.add(new Experiment2B2Builder(config));
-                suite.add(new Experiment2B4Builder(config));
-                suite.add(new Experiment2B8Builder(config));
-                suite.add(new Experiment1ABuilder(config));
-                suite.add(new Experiment1BBuilder(config));
-                suite.add(new Experiment1CBuilder(config));
-                suite.add(new Experiment1DBuilder(config));
-                suite.add(new Experiment1EBuilder(config));
-                suite.add(new Experiment4ABuilder(config));
-                suite.add(new Experiment4BBuilder(config));
-                suite.add(new Experiment4CBuilder(config));
-                suite.add(new Experiment4DBuilder(config));
-                suite.add(new Experiment3ABuilder(config));
-                suite.add(new Experiment3BBuilder(config));
-                suite.add(new Experiment3CBuilder(config));
-                suite.add(new Experiment3DBuilder(config));
-                suite.add(new Experiment5ABuilder(config));
-                suite.add(new Experiment5BBuilder(config));
-                suite.add(new Experiment5CBuilder(config));
-                suite.add(new Experiment5DBuilder(config));
-        */
-                suite.add(new PerfTestAggBuilder(config));
-                suite.add(new PresetClusterPerfBuilder(config));
-
-        Pattern p = config.getRegex() == null ? null : Pattern.compile(config.getRegex());
-
-        SequentialActionList exps = new SequentialActionList();
-        for (AbstractExperimentBuilder eb : suite) {
-            if (p == null || p.matcher(eb.getName()).matches()) {
-                exps.add(eb.build());
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Added " + eb.getName() + " to run list...");
-                }
-            }
-        }
-        exps.perform();
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0e21afa7/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/LSMPerfConstants.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/LSMPerfConstants.java b/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/LSMPerfConstants.java
deleted file mode 100644
index 78483d1..0000000
--- a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/LSMPerfConstants.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.experiment.client;
-
-public class LSMPerfConstants {
-
-    private LSMPerfConstants(){
-        throw new UnsupportedOperationException();
-    } // never needs to be instantiated
-
-    public static final String CONFIG_DIR = "configs";
-
-    public static final String AQL_DIR = "aql";
-
-    public static final String BASE_DIR = "base";
-
-    public static final String DGEN_DIR = "dgen";
-
-    public static final String LOG_DIR = "log";
-
-    public static final String BASE_TYPES = "base/perf_types.aql";
-
-    public static final String RESULT_FILE = "agg_results.csv";
-
-    public static final String ASTERIX_CONFIGURATION = "asterix-configuration.xml";
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0e21afa7/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/OrchestratorDGProtocol.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/OrchestratorDGProtocol.java b/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/OrchestratorDGProtocol.java
deleted file mode 100644
index a29a74c..0000000
--- a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/OrchestratorDGProtocol.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.experiment.client;
-
-public enum OrchestratorDGProtocol {
-    STOPPED,
-    RESUME,
-    REACHED
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0e21afa7/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/OrchestratorServer.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/OrchestratorServer.java b/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/OrchestratorServer.java
deleted file mode 100644
index a69b0ce..0000000
--- a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/OrchestratorServer.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.experiment.client;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.experiment.action.base.IAction;
-
-public class OrchestratorServer {
-
-    private static final Logger LOGGER = Logger.getLogger(OrchestratorServer.class.getName());
-
-    private final int port;
-
-    private final int nDataGens;
-
-    private final int nIntervals;
-
-    private final AtomicBoolean running;
-
-    private final IAction[] protocolActions;
-
-    private final boolean flagStopResume;
-
-    public OrchestratorServer(int port, int nDataGens, int nIntervals, IAction[] protocolActions) {
-        this.port = port;
-        this.nDataGens = nDataGens;
-        this.nIntervals = nIntervals;
-        running = new AtomicBoolean();
-        this.protocolActions = protocolActions;
-        this.flagStopResume = true;
-    }
-    
-    public synchronized void start() throws IOException, InterruptedException {
-        final AtomicBoolean bound = new AtomicBoolean();
-        running.set(true);
-        Thread t = new Thread(new Runnable() {
-
-            @Override
-            public void run() {
-                try {
-                    ServerSocket ss = new ServerSocket(port);
-                    synchronized (bound) {
-                        bound.set(true);
-                        bound.notifyAll();
-                    }
-                    Socket[] conn = new Socket[nDataGens];
-                    try {
-                        for (int i = 0; i < nDataGens; i++) {
-                            conn[i] = ss.accept();
-                        }
-                        for (int n = 0; n < nIntervals; ++n) {
-                            //TODO refactor operations according to the protocol message
-                            if (flagStopResume) {
-                                for (int i = 0; i < nDataGens; i++) {
-                                    receiveStopped(conn[i]);
-                                }
-                                protocolActions[n].perform();
-                                if (n != nIntervals - 1) {
-                                    for (int i = 0; i < nDataGens; i++) {
-                                        sendResume(conn[i]);
-                                    }
-                                }
-                            } else {
-                                for (int i = 0; i < nDataGens; i++) {
-                                    receiveReached(conn[i]);
-                                }
-                                protocolActions[n].perform();
-                            }
-                        }
-                    } finally {
-                        for (int i = 0; i < conn.length; ++i) {
-                            if (conn[i] != null) {
-                                conn[i].close();
-                            }
-                        }
-                        ss.close();
-                    }
-                    running.set(false);
-                    synchronized (OrchestratorServer.this) {
-                        OrchestratorServer.this.notifyAll();
-                    }
-                } catch (Throwable t) {
-                    t.printStackTrace();
-                }
-            }
-
-        });
-        t.start();
-        synchronized (bound) {
-            while (!bound.get()) {
-                bound.wait();
-            }
-        }
-    }
-
-    private void sendResume(Socket conn) throws IOException {
-        new DataOutputStream(conn.getOutputStream()).writeInt(OrchestratorDGProtocol.RESUME.ordinal());
-        conn.getOutputStream().flush();
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Sent " + OrchestratorDGProtocol.RESUME + " to " + conn.getRemoteSocketAddress());
-        }
-    }
-
-    private void receiveStopped(Socket conn) throws IOException {
-        int msg = new DataInputStream(conn.getInputStream()).readInt();
-        OrchestratorDGProtocol msgType = OrchestratorDGProtocol.values()[msg];
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Received " + msgType + " from " + conn.getRemoteSocketAddress());
-        }
-        if (msgType != OrchestratorDGProtocol.STOPPED) {
-            throw new IllegalStateException("Encounted unknown message type " + msgType);
-        }
-    }
-
-    private void receiveReached(Socket conn) throws IOException {
-        int msg = new DataInputStream(conn.getInputStream()).readInt();
-        OrchestratorDGProtocol msgType = OrchestratorDGProtocol.values()[msg];
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Received " + msgType + " from " + conn.getRemoteSocketAddress());
-        }
-        if (msgType != OrchestratorDGProtocol.REACHED) {
-            throw new IllegalStateException("Encounted unknown message type " + msgType);
-        }
-
-    }
-
-    public synchronized void awaitFinished() throws InterruptedException {
-        while (running.get()) {
-            wait();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0e21afa7/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/OrchestratorServer7.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/OrchestratorServer7.java b/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/OrchestratorServer7.java
deleted file mode 100644
index c547393..0000000
--- a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/OrchestratorServer7.java
+++ /dev/null
@@ -1,238 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.experiment.client;
-
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.experiment.action.base.IAction;
-import org.apache.asterix.experiment.builder.AbstractExperiment7Builder;
-
-public class OrchestratorServer7 {
-
-    private static final Logger LOGGER = Logger.getLogger(OrchestratorServer7.class.getName());
-
-    private final int port;
-
-    private final int nDataGens;
-
-    private final int nIntervals;
-
-    private final AtomicBoolean running;
-
-    private final IProtocolActionBuilder protoActionBuilder;
-    
-    private final IAction lsAction;
-
-    private static final int QUERY_TOTAL_COUNT = 2000;
-
-    public OrchestratorServer7(int port, int nDataGens, int nIntervals, IProtocolActionBuilder protoActionBuilder, IAction lsAction) {
-        this.port = port;
-        this.nDataGens = nDataGens;
-        this.nIntervals = nIntervals;
-        running = new AtomicBoolean();
-        this.protoActionBuilder = protoActionBuilder;
-        this.lsAction = lsAction;
-    }
-
-    public synchronized void start() throws IOException, InterruptedException {
-        final AtomicBoolean bound = new AtomicBoolean();
-        running.set(true);
-        Thread t = new Thread(new Runnable() {
-
-            @Override
-            public void run() {
-                try {
-                    ServerSocket ss = new ServerSocket(port);
-                    synchronized (bound) {
-                        bound.set(true);
-                        bound.notifyAll();
-                    }
-                    Socket[] conn = new Socket[nDataGens];
-                    try {
-                        for (int i = 0; i < nDataGens; i++) {
-                            conn[i] = ss.accept();
-                        }
-                        AtomicInteger round = new AtomicInteger();
-                        AtomicBoolean done = new AtomicBoolean(false);
-                        Thread pct = new Thread(new ProtocolConsumer(conn, nIntervals, round, done));
-                        pct.start();
-                        int[] queryType = new int[] { 10, 100, 1000, 10000 };
-                        int type = 0;
-                        //step1. send query when it reaches the query begin round
-                        if (LOGGER.isLoggable(Level.INFO)) {
-                            LOGGER.info("Step1 starts");
-                        }
-                        boolean sendQuery = false;
-                        while (!done.get()) {
-                            if (!sendQuery) {
-                                synchronized (round) {
-                                    while (true) {
-                                        if (round.get() >= AbstractExperiment7Builder.QUERY_BEGIN_ROUND) {
-                                            sendQuery = true;
-                                            break;
-                                        }
-                                        round.wait();
-                                    }
-                                }
-                            }
-                            if (sendQuery) {
-                                protoActionBuilder.buildQueryAction(queryType[type % 4], false).perform();
-                                type = (++type) % 4;
-                            }
-
-                        }
-                        pct.join();
-                        if (LOGGER.isLoggable(Level.INFO)) {
-                            LOGGER.info("Step1 ends");
-                        }
-                        
-                        if (LOGGER.isLoggable(Level.INFO)) {
-                            LOGGER.info("Step2 starts");
-                        }
-                        //step2. send one more round of queries after ingestion is over
-                        protoActionBuilder.buildIOWaitAction().perform();
-                        lsAction.perform();
-                        for (int i = 0; i < QUERY_TOTAL_COUNT; i++) {
-                            protoActionBuilder.buildQueryAction(queryType[i % 4], true).perform();
-                        }
-                        if (LOGGER.isLoggable(Level.INFO)) {
-                            LOGGER.info("Step2 ends");
-                        }
-                        
-                        if (LOGGER.isLoggable(Level.INFO)) {
-                            LOGGER.info("Step3 starts");
-                        }
-                        //step3. compact dataset
-                        protoActionBuilder.buildCompactAction().perform();
-                        protoActionBuilder.buildIOWaitAction().perform();
-                        if (LOGGER.isLoggable(Level.INFO)) {
-                            LOGGER.info("Step3 ends");
-                        }
-                        
-                        if (LOGGER.isLoggable(Level.INFO)) {
-                            LOGGER.info("Step4 starts");
-                        }
-                        //step4. send last round of queries after the compaction is over
-                        for (int i = 0; i < QUERY_TOTAL_COUNT; i++) {
-                            protoActionBuilder.buildQueryAction(queryType[i % 4], true).perform();
-                        }
-                        if (LOGGER.isLoggable(Level.INFO)) {
-                            LOGGER.info("Step4 ends");
-                        }
-
-                    } finally {
-                        for (int i = 0; i < conn.length; ++i) {
-                            if (conn[i] != null) {
-                                conn[i].close();
-                            }
-                        }
-                        ss.close();
-                    }
-                    running.set(false);
-                    synchronized (OrchestratorServer7.this) {
-                        OrchestratorServer7.this.notifyAll();
-                    }
-                } catch (Throwable t) {
-                    t.printStackTrace();
-                }
-            }
-
-        });
-        t.start();
-        synchronized (bound) {
-            while (!bound.get()) {
-                bound.wait();
-            }
-        }
-    }
-
-    private static class ProtocolConsumer implements Runnable {
-
-        private final Socket[] conn;
-
-        private final int nIntervals;
-
-        private final AtomicInteger interval;
-
-        private final AtomicBoolean done;
-
-        public ProtocolConsumer(Socket[] conn, int nIntervals, AtomicInteger interval, AtomicBoolean done) {
-            this.conn = conn;
-            this.nIntervals = nIntervals;
-            this.interval = interval;
-            this.done = done;
-        }
-
-        @Override
-        public void run() {
-            interval.set(0);
-            try {
-                for (int n = 0; n < nIntervals; ++n) {
-                    for (int i = 0; i < conn.length; i++) {
-                        receiveReached(conn[i]);
-                    }
-                    synchronized (interval) {
-                        interval.getAndIncrement();
-                        interval.notifyAll();
-                    }
-                }
-                done.set(true);
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-        }
-
-    }
-
-    private static void receiveReached(Socket conn) throws IOException {
-        int msg = new DataInputStream(conn.getInputStream()).readInt();
-        OrchestratorDGProtocol msgType = OrchestratorDGProtocol.values()[msg];
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Received " + msgType + " from " + conn.getRemoteSocketAddress());
-        }
-        if (msgType != OrchestratorDGProtocol.REACHED) {
-            throw new IllegalStateException("Encounted unknown message type " + msgType);
-        }
-
-    }
-
-    public synchronized void awaitFinished() throws InterruptedException {
-        while (running.get()) {
-            wait();
-        }
-    }
-
-    public interface IProtocolActionBuilder {
-        public IAction buildQueryAction(long cardinality, boolean finalRound) throws Exception;
-
-        public IAction buildIOWaitAction() throws Exception;
-
-        public IAction buildCompactAction() throws Exception;
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0e21afa7/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/OrchestratorServer9.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/OrchestratorServer9.java b/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/OrchestratorServer9.java
deleted file mode 100644
index be50e7c..0000000
--- a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/OrchestratorServer9.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.experiment.client;
-
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.experiment.action.base.IAction;
-
-public class OrchestratorServer9 {
-
-    private static final Logger LOGGER = Logger.getLogger(OrchestratorServer9.class.getName());
-
-    private final int port;
-
-    private final int nDataGens;
-
-    private final int nIntervals;
-
-    private final AtomicBoolean running;
-
-    private final IProtocolActionBuilder protoActionBuilder;
-
-    public OrchestratorServer9(int port, int nDataGens, int nIntervals, IProtocolActionBuilder protoActionBuilder) {
-        this.port = port;
-        this.nDataGens = nDataGens;
-        this.nIntervals = nIntervals;
-        running = new AtomicBoolean();
-        this.protoActionBuilder = protoActionBuilder;
-    }
-
-    public synchronized void start() throws IOException, InterruptedException {
-        final AtomicBoolean bound = new AtomicBoolean();
-        running.set(true);
-        Thread t = new Thread(new Runnable() {
-
-            @Override
-            public void run() {
-                try {
-                    ServerSocket ss = new ServerSocket(port);
-                    synchronized (bound) {
-                        bound.set(true);
-                        bound.notifyAll();
-                    }
-                    Socket[] conn = new Socket[nDataGens];
-                    try {
-                        for (int i = 0; i < nDataGens; i++) {
-                            conn[i] = ss.accept();
-                        }
-                        AtomicInteger round = new AtomicInteger();
-                        AtomicBoolean done = new AtomicBoolean(false);
-                        Thread pct = new Thread(new ProtocolConsumer(conn, nIntervals, round, done));
-                        pct.start();
-                        while (!done.get()) {
-                            protoActionBuilder.buildAction(round.get()).perform();
-                        }
-                        pct.join();
-                    } finally {
-                        for (int i = 0; i < conn.length; ++i) {
-                            if (conn[i] != null) {
-                                conn[i].close();
-                            }
-                        }
-                        ss.close();
-                    }
-                    running.set(false);
-                    synchronized (OrchestratorServer9.this) {
-                        OrchestratorServer9.this.notifyAll();
-                    }
-                } catch (Throwable t) {
-                    t.printStackTrace();
-                }
-            }
-
-        });
-        t.start();
-        synchronized (bound) {
-            while (!bound.get()) {
-                bound.wait();
-            }
-        }
-    }
-
-    private static class ProtocolConsumer implements Runnable {
-
-        private final Socket[] conn;
-
-        private final int nIntervals;
-
-        private final AtomicInteger interval;
-
-        private final AtomicBoolean done;
-
-        public ProtocolConsumer(Socket[] conn, int nIntervals, AtomicInteger interval, AtomicBoolean done) {
-            this.conn = conn;
-            this.nIntervals = nIntervals;
-            this.interval = interval;
-            this.done = done;
-        }
-
-        @Override
-        public void run() {
-            interval.set(0);
-            try {
-                for (int n = 0; n < nIntervals; ++n) {
-                    for (int i = 0; i < conn.length; i++) {
-                        receiveReached(conn[i]);
-                    }
-                    interval.getAndIncrement();
-                }
-                done.set(true);
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-        }
-
-    }
-
-    private static void receiveReached(Socket conn) throws IOException {
-        int msg = new DataInputStream(conn.getInputStream()).readInt();
-        OrchestratorDGProtocol msgType = OrchestratorDGProtocol.values()[msg];
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Received " + msgType + " from " + conn.getRemoteSocketAddress());
-        }
-        if (msgType != OrchestratorDGProtocol.REACHED) {
-            throw new IllegalStateException("Encounted unknown message type " + msgType);
-        }
-
-    }
-
-    public synchronized void awaitFinished() throws InterruptedException {
-        while (running.get()) {
-            wait();
-        }
-    }
-
-    public interface IProtocolActionBuilder {
-        public IAction buildAction(int round) throws Exception;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0e21afa7/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/RecordCountingServer.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/RecordCountingServer.java b/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/RecordCountingServer.java
deleted file mode 100644
index 22e5ac0..0000000
--- a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/RecordCountingServer.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.experiment.client;
-
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.Reader;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-public class RecordCountingServer {
-
-    private final ExecutorService threadPool;
-
-    private final int port;
-
-    private final long duration;
-
-    private ServerSocket ss;
-
-    private boolean stopped;
-
-    private final Object o = new Object();
-
-    final AtomicBoolean b = new AtomicBoolean(false);
-
-    public RecordCountingServer(int port, long duration) {
-        this.port = port;
-        this.duration = duration;
-        threadPool = Executors.newCachedThreadPool();
-    }
-
-    public void start() throws IOException, InterruptedException {
-        Thread t = new Thread(new Runnable() {
-
-            @Override
-            public void run() {
-                try {
-                    stopped = false;
-                    ss = new ServerSocket(port);
-                    while (true) {
-                        Socket s = ss.accept();
-                        if (stopped) {
-                            break;
-                        }
-                        threadPool.execute(new RecordCountingThread(s, duration));
-                        synchronized (o) {
-                            b.set(true);
-                            o.notifyAll();
-                        }
-                    }
-                } catch (Exception e) {
-                    e.printStackTrace();
-                }
-            }
-        });
-        t.start();
-    }
-
-    public void awaitFirstConnection() throws InterruptedException {
-        synchronized (o) {
-            if (!b.get()) {
-                o.wait();
-            }
-        }
-    }
-
-    public void stop() throws IOException, InterruptedException {
-        stopped = true;
-        threadPool.shutdown();
-        threadPool.awaitTermination(1000, TimeUnit.DAYS);
-        ss.close();
-    }
-
-    private static class RecordCountingThread implements Runnable {
-        private final Socket s;
-
-        private final long duration;
-
-        private final char[] buf;
-
-        private int index;
-
-        private int count;
-
-        public RecordCountingThread(Socket s, long duration) {
-            this.s = s;
-            this.duration = duration;
-            buf = new char[32 * 1024];
-        }
-
-        @Override
-        public void run() {
-            count = 0;
-            index = 0;
-            long start = System.currentTimeMillis();
-            try {
-                InputStreamReader r = new InputStreamReader(s.getInputStream());
-                while (System.currentTimeMillis() - start < duration) {
-                    fill(r);
-                    countRecords();
-                }
-            } catch (IOException e) {
-                e.printStackTrace();
-            }
-            long end = System.currentTimeMillis();
-            System.out.println("Read " + count + " records in " + (end - start) / 1000 + " seconds");
-        }
-
-        private void countRecords() {
-            for (int i = 0; i < index; ++i) {
-                if (buf[i] == '\n') {
-                    ++count;
-                }
-            }
-        }
-
-        private void fill(Reader r) throws IOException {
-            index = 0;
-            int read = r.read(buf);
-            if (read == -1) {
-                index = 0;
-                return;
-            }
-            index += read;
-        }
-    }
-
-    public static void main(String[] args) throws Exception {
-        long duration = Long.parseLong(args[0]);
-        int port1 = Integer.parseInt(args[1]);
-        int port2 = Integer.parseInt(args[2]);
-        RecordCountingServer rcs1 = new RecordCountingServer(port1, duration * 1000);
-        RecordCountingServer rcs2 = new RecordCountingServer(port2, duration * 1000);
-        try {
-            rcs1.start();
-            rcs2.start();
-            rcs1.awaitFirstConnection();
-            rcs2.awaitFirstConnection();
-        } finally {
-            rcs1.stop();
-            rcs2.stop();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0e21afa7/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/SocketDataGeneratorExecutable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/SocketDataGeneratorExecutable.java b/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/SocketDataGeneratorExecutable.java
deleted file mode 100644
index eeac0b4..0000000
--- a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/SocketDataGeneratorExecutable.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.experiment.client;
-
-import java.net.Socket;
-import java.util.Collections;
-
-import org.apache.asterix.experiment.action.base.AbstractAction;
-import org.apache.asterix.tools.external.data.TweetGeneratorForSpatialIndexEvaluation;
-
-public class SocketDataGeneratorExecutable extends AbstractAction {
-
-    private final String adapterHost;
-
-    private final int adapterPort;
-
-    public SocketDataGeneratorExecutable(String adapterHost, int adapterPort) {
-        this.adapterHost = adapterHost;
-        this.adapterPort = adapterPort;
-    }
-
-    @Override
-    protected void doPerform() throws Exception {
-        Thread.sleep(4000);
-        Socket s = new Socket(adapterHost, adapterPort);
-        try {
-            TweetGeneratorForSpatialIndexEvaluation tg = new TweetGeneratorForSpatialIndexEvaluation(Collections.<String, String> emptyMap(), 0,
-                    TweetGeneratorForSpatialIndexEvaluation.OUTPUT_FORMAT_ADM_STRING, s.getOutputStream());
-            long start = System.currentTimeMillis();
-            while (tg.setNextRecordBatch(1000)) {
-            }
-            long end = System.currentTimeMillis();
-            long total = end - start;
-            System.out.println("Generation finished: " + tg.getNumFlushedTweets() + " in " + total / 1000 + " seconds");
-        } finally {
-            s.close();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0e21afa7/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/SocketTweetGenerator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/SocketTweetGenerator.java b/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/SocketTweetGenerator.java
deleted file mode 100644
index f817fc9..0000000
--- a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/SocketTweetGenerator.java
+++ /dev/null
@@ -1,389 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.experiment.client;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.InetAddress;
-import java.net.Socket;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.tools.external.data.TweetGeneratorForSpatialIndexEvaluation;
-import org.apache.commons.lang3.tuple.Pair;
-
-public class SocketTweetGenerator {
-
-    private final ExecutorService threadPool;
-
-    private final int partitionRangeStart;
-
-    private final int dataGenDuration;
-
-    private final int queryGenDuration;
-
-    private final long startDataInterval;
-
-    private final int nDataIntervals;
-
-    private final String orchHost;
-
-    private final int orchPort;
-
-    private final List<Pair<String, Integer>> receiverAddresses;
-
-    private final String openStreetMapFilePath;
-    private final int locationSampleInterval;
-    private final int recordCountPerBatchDuringIngestionOnly;
-    private final int recordCountPerBatchDuringQuery;
-    private final long dataGenSleepTimeDuringIngestionOnly;
-    private final long dataGenSleepTimeDuringQuery;
-
-    private final Mode mode;
-
-    private enum Mode {
-        TIME,
-        DATA
-    }
-
-    public SocketTweetGenerator(SocketTweetGeneratorConfig config) {
-        threadPool = Executors.newCachedThreadPool(new ThreadFactory() {
-
-            private final AtomicInteger count = new AtomicInteger();
-
-            @Override
-            public Thread newThread(Runnable r) {
-                int tid = count.getAndIncrement();
-                Thread t = new Thread(r, "DataGeneratorThread: " + tid);
-                t.setDaemon(true);
-                return t;
-            }
-        });
-
-        partitionRangeStart = config.getPartitionRangeStart();
-        dataGenDuration = config.getDataGenDuration();
-        queryGenDuration = config.getQueryGenDuration();
-        startDataInterval = config.getDataInterval();
-        nDataIntervals = config.getNIntervals();
-        orchHost = config.getOrchestratorHost();
-        orchPort = config.getOrchestratorPort();
-        receiverAddresses = config.getAddresses();
-        mode = startDataInterval > 0 ? Mode.DATA : Mode.TIME;
-        openStreetMapFilePath = config.getOpenStreetMapFilePath();
-        locationSampleInterval = config.getLocationSampleInterval();
-        recordCountPerBatchDuringIngestionOnly = config.getRecordCountPerBatchDuringIngestionOnly();
-        recordCountPerBatchDuringQuery = config.getRecordCountPerBatchDuringQuery();
-        dataGenSleepTimeDuringIngestionOnly = config.getDataGenSleepTimeDuringIngestionOnly();
-        dataGenSleepTimeDuringQuery = config.getDataGenSleepTimeDuringQuery();
-    }
-
-    public void start() throws Exception {
-        final Semaphore sem = new Semaphore((receiverAddresses.size() - 1) * -1);
-        int i = 0;
-        for (Pair<String, Integer> address : receiverAddresses) {
-            threadPool.submit(new DataGenerator(mode, sem, address.getLeft(), address.getRight(), i
-                    + partitionRangeStart, dataGenDuration, queryGenDuration, nDataIntervals, startDataInterval,
-                    orchHost, orchPort, openStreetMapFilePath, locationSampleInterval,
-                    recordCountPerBatchDuringIngestionOnly, recordCountPerBatchDuringQuery,
-                    dataGenSleepTimeDuringIngestionOnly, dataGenSleepTimeDuringQuery));
-            ++i;
-        }
-        sem.acquire();
-    }
-
-    public static class DataGenerator implements Runnable {
-
-        private static final Logger LOGGER = Logger.getLogger(DataGenerator.class.getName());
-
-        private final Mode m;
-        private final Semaphore sem;
-        private final String host;
-        private final int port;
-        private final int partition;
-        private final int dataGenDuration;
-        private final int queryGenDuration;
-        private final int nDataIntervals;
-        private final String orchHost;
-        private final int orchPort;
-
-        private int currentInterval;
-        private long nextStopInterval;
-        private final long dataSizeInterval;
-        private final boolean flagStopResume;
-        private final String openStreetMapFilePath;
-        private final int locationSampleInterval;
-        private final int recordCountPerBatchDuringIngestionOnly;
-        private final int recordCountPerBatchDuringQuery;
-        private final long dataGenSleepTimeDuringIngestionOnly;
-        private final long dataGenSleepTimeDuringQuery;
-
-        public DataGenerator(Mode m, Semaphore sem, String host, int port, int partition, int dataGenDuration,
-                int queryGenDuration, int nDataIntervals, long dataSizeInterval, String orchHost, int orchPort,
-                String openStreetMapFilePath, int locationSampleInterval, int recordCountPerBatchDuringIngestionOnly,
-                int recordCountPerBatchDuringQuery, long dataGenSleepTimeDuringIngestionOnly,
-                long dataGenSleepTimeDuringQuery) {
-            this.m = m;
-            this.sem = sem;
-            this.host = host;
-            this.port = port;
-            this.partition = partition;
-            this.dataGenDuration = dataGenDuration;
-            this.queryGenDuration = queryGenDuration;
-            this.nDataIntervals = nDataIntervals;
-            currentInterval = 0;
-            this.dataSizeInterval = dataSizeInterval;
-            this.nextStopInterval = dataSizeInterval;
-            this.orchHost = orchHost;
-            this.orchPort = orchPort;
-            this.flagStopResume = false;
-            this.openStreetMapFilePath = openStreetMapFilePath;
-            //simple heuristic to generate different data from different data generator.
-            int lsi = locationSampleInterval + (partition + 1) * (partition <= 4 ? 7 : 9);
-            this.locationSampleInterval = lsi;
-            this.recordCountPerBatchDuringIngestionOnly = recordCountPerBatchDuringIngestionOnly;
-            this.recordCountPerBatchDuringQuery = recordCountPerBatchDuringQuery;
-            this.dataGenSleepTimeDuringIngestionOnly = dataGenSleepTimeDuringIngestionOnly;
-            this.dataGenSleepTimeDuringQuery = dataGenSleepTimeDuringQuery;
-        }
-
-        @Override
-        public void run() {
-            LOGGER.info("\nDataGen[" + partition + "] running with the following parameters: \n" + "dataGenDuration : "
-                    + dataGenDuration + "\n" + "queryGenDuration : " + queryGenDuration + "\n" + "nDataIntervals : "
-                    + nDataIntervals + "\n" + "dataSizeInterval : " + dataSizeInterval + "\n"
-                    + "recordCountPerBatchDuringIngestionOnly : " + recordCountPerBatchDuringIngestionOnly + "\n"
-                    + "recordCountPerBatchDuringQuery : " + recordCountPerBatchDuringQuery + "\n"
-                    + "dataGenSleepTimeDuringIngestionOnly : " + dataGenSleepTimeDuringIngestionOnly + "\n"
-                    + "dataGenSleepTimeDuringQuery : " + dataGenSleepTimeDuringQuery + "\n"
-                    + "locationSampleInterval : " + locationSampleInterval);
-
-            try {
-                Socket s = new Socket(host, port);
-                try {
-                    Socket orchSocket = null;
-                    if (m == Mode.DATA && orchHost != null) {
-                        orchSocket = new Socket(orchHost, orchPort);
-                    }
-                    TweetGeneratorForSpatialIndexEvaluation tg = null;
-                    try {
-                        Map<String, String> config = new HashMap<>();
-                        String durationVal = m == Mode.TIME ? String.valueOf(dataGenDuration) : "0";
-                        config.put(TweetGeneratorForSpatialIndexEvaluation.KEY_DURATION, String.valueOf(durationVal));
-                        if (openStreetMapFilePath != null) {
-                            config.put(TweetGeneratorForSpatialIndexEvaluation.KEY_OPENSTREETMAP_FILEPATH,
-                                    openStreetMapFilePath);
-                            config.put(TweetGeneratorForSpatialIndexEvaluation.KEY_LOCATION_SAMPLE_INTERVAL,
-                                    String.valueOf(locationSampleInterval));
-                        }
-                        tg = new TweetGeneratorForSpatialIndexEvaluation(config, partition,
-                                TweetGeneratorForSpatialIndexEvaluation.OUTPUT_FORMAT_ADM_STRING, s.getOutputStream());
-                        long startTS = System.currentTimeMillis();
-                        long prevTS = startTS;
-                        long curTS = startTS;
-                        int round = 0;
-                        while (tg.setNextRecordBatch(recordCountPerBatchDuringIngestionOnly)) {
-                            if (m == Mode.DATA) {
-                                if (tg.getNumFlushedTweets() >= nextStopInterval) {
-                                    //TODO stop/resume option
-                                    if (orchSocket != null) {
-                                        if (flagStopResume) {
-                                            // send stop to orchestrator
-                                            sendStopped(orchSocket);
-                                        } else {
-                                            sendReached(orchSocket);
-                                        }
-                                    }
-
-                                    // update intervals
-                                    // TODO give options: exponential/linear interval
-                                    nextStopInterval += dataSizeInterval;
-                                    if (++currentInterval >= nDataIntervals) {
-                                        break;
-                                    }
-
-                                    if (orchSocket != null) {
-                                        if (flagStopResume) {
-                                            receiveResume(orchSocket);
-                                        }
-                                    }
-                                }
-                            }
-                            curTS = System.currentTimeMillis();
-                            if (LOGGER.isLoggable(Level.INFO)) {
-                                round++;
-                                if ((round * recordCountPerBatchDuringIngestionOnly) % 100000 == 0) {
-                                    System.out.println("DataGen[" + partition
-                                            + "][During ingestion only][TimeToInsert100000] " + (curTS - prevTS)
-                                            + " in milliseconds");
-                                    round = 0;
-                                    prevTS = curTS;
-                                }
-                            }
-                            //to prevent congestion in feed pipe line. 
-                            if (dataGenSleepTimeDuringIngestionOnly > 0) {
-                                Thread.sleep(dataGenSleepTimeDuringIngestionOnly);
-                            }
-                        }
-
-                        if (LOGGER.isLoggable(Level.INFO)) {
-                            LOGGER.info("DataGen[" + partition
-                                    + "][During ingestion only][InsertCount] Num tweets flushed = "
-                                    + tg.getNumFlushedTweets() + " in "
-                                    + ((System.currentTimeMillis() - startTS) / 1000) + " seconds from "
-                                    + InetAddress.getLocalHost() + " to " + host + ":" + port);
-                        }
-
-                        if (orchSocket != null && queryGenDuration > 0) {
-                            //wait until orchestrator server's resume message is received.
-                            receiveResume(orchSocket);
-
-                            //reset duration and flushed tweet count
-                            tg.resetDurationAndFlushedTweetCount(queryGenDuration);
-
-                            prevTS = System.currentTimeMillis();
-                            round = 0;
-                            //start sending record
-                            while (tg.setNextRecordBatch(recordCountPerBatchDuringQuery)) {
-                                curTS = System.currentTimeMillis();
-                                if (LOGGER.isLoggable(Level.INFO)) {
-                                    round++;
-                                    if ((round * recordCountPerBatchDuringQuery) % 100000 == 0) {
-                                        System.out.println("DataGen[" + partition
-                                                + "][During ingestion + queries][TimeToInsert100000] "
-                                                + (curTS - prevTS) + " in milliseconds");
-                                        round = 0;
-                                        prevTS = curTS;
-                                    }
-                                }
-                                if (dataGenSleepTimeDuringQuery > 0) {
-                                    Thread.sleep(dataGenSleepTimeDuringQuery);
-                                }
-                            }
-                            if (LOGGER.isLoggable(Level.INFO)) {
-                                LOGGER.info("DataGen[" + partition
-                                        + "][During ingestion + queries][InsertCount] Num tweets flushed = "
-                                        + tg.getNumFlushedTweets() + " in " + queryGenDuration + " seconds from "
-                                        + InetAddress.getLocalHost() + " to " + host + ":" + port);
-                            }
-                            //send reached message to orchestrator server
-                            sendReached(orchSocket);
-                        }
-
-                    } finally {
-                        if (orchSocket != null) {
-                            orchSocket.close();
-                        }
-                        if (LOGGER.isLoggable(Level.INFO)) {
-                            LOGGER.info("Num tweets flushed = " + tg.getNumFlushedTweets() + " in " + dataGenDuration
-                                    + " seconds from " + InetAddress.getLocalHost() + " to " + host + ":" + port);
-                        }
-                    }
-                } catch (Throwable t) {
-                    t.printStackTrace();
-                } finally {
-                    s.close();
-                }
-            } catch (Throwable t) {
-                System.err.println("Error connecting to " + host + ":" + port);
-                t.printStackTrace();
-            } finally {
-                sem.release();
-            }
-        }
-
-        private void sendReached(Socket s) throws IOException {
-            new DataOutputStream(s.getOutputStream()).writeInt(OrchestratorDGProtocol.REACHED.ordinal());
-            s.getOutputStream().flush();
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Sent " + OrchestratorDGProtocol.REACHED + " to " + s.getRemoteSocketAddress());
-            }
-        }
-
-        private void receiveResume(Socket s) throws IOException {
-            int msg = new DataInputStream(s.getInputStream()).readInt();
-            OrchestratorDGProtocol msgType = OrchestratorDGProtocol.values()[msg];
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Received " + msgType + " from " + s.getRemoteSocketAddress());
-            }
-            if (msgType != OrchestratorDGProtocol.RESUME) {
-                throw new IllegalStateException("Unknown protocol message received: " + msgType);
-            }
-        }
-
-        private void sendStopped(Socket s) throws IOException {
-            new DataOutputStream(s.getOutputStream()).writeInt(OrchestratorDGProtocol.STOPPED.ordinal());
-            s.getOutputStream().flush();
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Sent " + OrchestratorDGProtocol.STOPPED + " to " + s.getRemoteSocketAddress());
-            }
-        }
-
-    }
-
-    private static class CircularByteArrayOutputStream extends OutputStream {
-
-        private final byte[] buf;
-
-        private int index;
-
-        public CircularByteArrayOutputStream() {
-            buf = new byte[32 * 1024];
-            index = 0;
-        }
-
-        @Override
-        public void write(byte b[], int off, int len) throws IOException {
-            if (b == null) {
-                throw new NullPointerException();
-            } else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) {
-                throw new IndexOutOfBoundsException();
-            } else if (len == 0) {
-                return;
-            }
-
-            int remain = len;
-            int remainOff = off;
-            while (remain > 0) {
-                int avail = buf.length - index;
-                System.arraycopy(b, remainOff, buf, index, avail);
-                remainOff += avail;
-                remain -= avail;
-                index = (index + avail) % buf.length;
-            }
-        }
-
-        @Override
-        public void write(int b) throws IOException {
-            buf[index] = (byte) b;
-            index = (index + 1) % buf.length;
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0e21afa7/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/SocketTweetGeneratorConfig.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/SocketTweetGeneratorConfig.java b/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/SocketTweetGeneratorConfig.java
deleted file mode 100644
index d3bb4bd..0000000
--- a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/SocketTweetGeneratorConfig.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.experiment.client;
-
-import java.util.List;
-
-import org.apache.commons.lang3.tuple.Pair;
-import org.kohsuke.args4j.Argument;
-import org.kohsuke.args4j.CmdLineException;
-import org.kohsuke.args4j.CmdLineParser;
-import org.kohsuke.args4j.Option;
-import org.kohsuke.args4j.OptionDef;
-import org.kohsuke.args4j.spi.OptionHandler;
-import org.kohsuke.args4j.spi.Parameters;
-import org.kohsuke.args4j.spi.Setter;
-
-public class SocketTweetGeneratorConfig {
-
-    @Option(name = "-p", aliases = "--partition-range-start", usage = "Starting partition number for the set of data generators (default = 0)")
-    private int partitionRangeStart = 0;
-
-    public int getPartitionRangeStart() {
-        return partitionRangeStart;
-    }
-
-    @Option(name = "-d", aliases = { "--datagen-duration" }, usage = "Duration in seconds to run data generation")
-    private int duration = -1;
-
-    public int getDataGenDuration() {
-        return duration;
-    }
-
-    @Option(name = "-qd", aliases = { "--querygen-duration" }, usage = "Duration in seconds to run query generation")
-    private int queryDuration = -1;
-
-    public int getQueryGenDuration() {
-        return queryDuration;
-    }
-
-    @Option(name = "-di", aliases = "--data-interval", usage = "Initial data interval to use when generating data based on data size")
-    private long dataInterval = -1;
-
-    public long getDataInterval() {
-        return dataInterval;
-    }
-
-    @Option(name = "-ni", aliases = "--num-intervals", usage = "Number of intervals to use when generating data based on data size (default = 4)")
-    private int nIntervals = 4;
-
-    public int getNIntervals() {
-        return nIntervals;
-    }
-
-    @Option(name = "-oh", aliases = "--orachestrator-host", usage = "The host name of the orchestrator")
-    private String orchHost;
-
-    public String getOrchestratorHost() {
-        return orchHost;
-    }
-
-    @Option(name = "-op", aliases = "--orchestrator-port", usage = "The port number of the orchestrator")
-    private int orchPort;
-
-    public int getOrchestratorPort() {
-        return orchPort;
-    }
-
-    @Option(name = "-of", aliases = "--openstreetmap-filepath", usage = "The open street map gps point data file path")
-    private String openStreetMapFilePath;
-
-    public String getOpenStreetMapFilePath() {
-        return openStreetMapFilePath;
-    }
-
-    @Option(name = "-si", aliases = "--location-sample-interval", usage = "Location sample interval from open street map point data")
-    private int locationSampleInterval;
-
-    public int getLocationSampleInterval() {
-        return locationSampleInterval;
-    }
-
-    @Option(name = "-rcbi", aliases = "--record-count-per-batch-during-ingestion-only", usage = "Record count per batch during ingestion only")
-    private int recordCountPerBatchDuringIngestionOnly = 1000;
-
-    public int getRecordCountPerBatchDuringIngestionOnly() {
-        return recordCountPerBatchDuringIngestionOnly;
-    }
-
-    @Option(name = "-rcbq", aliases = "--record-count-per-batch-during-query", usage = "Record count per batch during query")
-    private int recordCountPerBatchDuringQuery = 1000;
-
-    public int getRecordCountPerBatchDuringQuery() {
-        return recordCountPerBatchDuringQuery;
-    }
-
-    @Option(name = "-dsti", aliases = "--data-gen-sleep-time-during-ingestion-only", usage = "DataGen sleep time in milliseconds after every recordCountPerBatchDuringIngestionOnly records were sent")
-    private long dataGenSleepTimeDuringIngestionOnly = 1;
-
-    public long getDataGenSleepTimeDuringIngestionOnly() {
-        return dataGenSleepTimeDuringIngestionOnly;
-    }
-
-    @Option(name = "-dstq", aliases = "--data-gen-sleep-time-during-query", usage = "DataGen sleep time in milliseconds after every recordCountPerBatchDuringQuery records were sent")
-    private long dataGenSleepTimeDuringQuery = 1;
-
-    public long getDataGenSleepTimeDuringQuery() {
-        return dataGenSleepTimeDuringQuery;
-    }
-
-    @Argument(required = true, usage = "A list of <ip>:<port> pairs (addresses) to send data to", metaVar = "addresses...", handler = AddressOptionHandler.class)
-    private List<Pair<String, Integer>> addresses;
-
-    public List<Pair<String, Integer>> getAddresses() {
-        return addresses;
-    }
-
-    public static class AddressOptionHandler extends OptionHandler<Pair<String, Integer>> {
-
-        public AddressOptionHandler(CmdLineParser parser, OptionDef option, Setter<? super Pair<String, Integer>> setter) {
-            super(parser, option, setter);
-        }
-
-        @Override
-        public int parseArguments(Parameters params) throws CmdLineException {
-            int counter = 0;
-            while (true) {
-                String param;
-                try {
-                    param = params.getParameter(counter);
-                } catch (CmdLineException ex) {
-                    break;
-                }
-
-                String[] hostPort = param.split(":");
-                if (hostPort.length != 2) {
-                    throw new CmdLineException("Invalid address: " + param + ". Expected <host>:<port>");
-                }
-                Integer port = null;
-                try {
-                    port = Integer.parseInt(hostPort[1]);
-                } catch (NumberFormatException e) {
-                    throw new CmdLineException("Invalid port " + hostPort[1] + " for address " + param + ".");
-                }
-                setter.addValue(Pair.of(hostPort[0], port));
-                counter++;
-            }
-            return counter;
-        }
-
-        @Override
-        public String getDefaultMetaVariable() {
-            return "addresses";
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0e21afa7/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/SocketTweetGeneratorDriver.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/SocketTweetGeneratorDriver.java b/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/SocketTweetGeneratorDriver.java
deleted file mode 100644
index d64809f..0000000
--- a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/SocketTweetGeneratorDriver.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.experiment.client;
-
-import org.kohsuke.args4j.CmdLineException;
-import org.kohsuke.args4j.CmdLineParser;
-
-public class SocketTweetGeneratorDriver {
-    public static void main(String[] args) throws Exception {
-        SocketTweetGeneratorConfig clientConfig = new SocketTweetGeneratorConfig();
-        CmdLineParser clp = new CmdLineParser(clientConfig);
-        try {
-            clp.parseArgument(args);
-        } catch (CmdLineException e) {
-            System.err.println(e.getMessage());
-            clp.printUsage(System.err);
-            System.exit(1);
-        }
-
-        if ((clientConfig.getDataInterval() == -1 && clientConfig.getDataGenDuration() == -1)
-                || (clientConfig.getDataInterval() > 0 && clientConfig.getDataGenDuration() > 0)) {
-            System.err.println("Must use exactly one of -d or -di");
-            clp.printUsage(System.err);
-            System.exit(1);
-        }
-
-        SocketTweetGenerator client = new SocketTweetGenerator(clientConfig);
-        client.start();
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0e21afa7/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/SpatialIndexExperiment2OrchestratorServer.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/SpatialIndexExperiment2OrchestratorServer.java b/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/SpatialIndexExperiment2OrchestratorServer.java
deleted file mode 100644
index 4e360dd..0000000
--- a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/SpatialIndexExperiment2OrchestratorServer.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.experiment.client;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-public class SpatialIndexExperiment2OrchestratorServer {
-
-    private static final Logger LOGGER = Logger.getLogger(SpatialIndexExperiment2OrchestratorServer.class.getName());
-
-    private final int dataGenPort;
-
-    private final int queryGenPort;
-
-    private final int nDataGens;
-
-    private final int nQueryGens;
-
-    private final int nIntervals;
-
-    private final AtomicBoolean running;
-
-    public SpatialIndexExperiment2OrchestratorServer(int dataGenPort, int nDataGens, int nIntervals, int queryGenPort,
-            int nQueryGens) {
-        this.dataGenPort = dataGenPort;
-        this.nDataGens = nDataGens;
-        this.queryGenPort = queryGenPort;
-        this.nQueryGens = nQueryGens;
-        this.nIntervals = nIntervals;
-        running = new AtomicBoolean();
-    }
-
-    public synchronized void start() throws IOException, InterruptedException {
-        final AtomicBoolean dataGenBound = new AtomicBoolean();
-        final AtomicBoolean queryGenBound = new AtomicBoolean();
-        running.set(true);
-        Thread t = new Thread(new Runnable() {
-
-            @Override
-            public void run() {
-                try {
-                    ServerSocket dataGenSS = new ServerSocket(dataGenPort);
-                    synchronized (dataGenBound) {
-                        dataGenBound.set(true);
-                        dataGenBound.notifyAll();
-                    }
-                    ServerSocket queryGenSS = new ServerSocket(queryGenPort);
-                    synchronized (queryGenBound) {
-                        queryGenBound.set(true);
-                        queryGenBound.notifyAll();
-                    }
-
-                    Socket[] dataConn = new Socket[nDataGens];
-                    Socket[] queryConn = new Socket[nQueryGens];
-                    try {
-                        //#.wait until all dataGens and queryGens have connected to the orchestrator
-                        for (int i = 0; i < nDataGens; i++) {
-                            dataConn[i] = dataGenSS.accept();
-                        }
-                        for (int i = 0; i < nQueryGens; i++) {
-                            queryConn[i] = queryGenSS.accept();
-                        }
-
-                        //#.wait until queryGens are ready for generating query
-                        for (int i = 0; i < nQueryGens; i++) {
-                            receiveReached(queryConn[i]);
-                        }
-
-                        //#.wait until dataGens are ready for generating data after nIntervals of data were generated 
-                        for (int i = 0; i < nIntervals; i++) {
-                            for (int j = 0; j < nDataGens; j++) {
-                                receiveReached(dataConn[j]);
-                            }
-                        }
-
-                        //#.send signal to queryGens to start sending queries
-                        for (int i = 0; i < nQueryGens; i++) {
-                            sendResume(queryConn[i]);
-                        }
-                        //#.send signal to dataGens to start sending records
-                        for (int i = 0; i < nDataGens; i++) {
-                            sendResume(dataConn[i]);
-                        }
-
-                        //#.wait until both dataGen and queryGen's are done
-                        for (int i = 0; i < nQueryGens; i++) {
-                            receiveReached(queryConn[i]);
-                        }
-                        for (int i = 0; i < nDataGens; i++) {
-                            receiveReached(dataConn[i]);
-                        }
-
-                    } finally {
-                        for (int i = 0; i < nDataGens; ++i) {
-                            if (dataConn[i] != null) {
-                                dataConn[i].close();
-                            }
-                        }
-                        dataGenSS.close();
-                        for (int i = 0; i < nQueryGens; ++i) {
-                            if (queryConn[i] != null) {
-                                queryConn[i].close();
-                            }
-                        }
-                        queryGenSS.close();
-                    }
-                    running.set(false);
-                    synchronized (SpatialIndexExperiment2OrchestratorServer.this) {
-                        SpatialIndexExperiment2OrchestratorServer.this.notifyAll();
-                    }
-                } catch (Throwable t) {
-                    t.printStackTrace();
-                }
-            }
-
-        });
-        t.start();
-        synchronized (dataGenBound) {
-            while (!dataGenBound.get()) {
-                dataGenBound.wait();
-            }
-        }
-        synchronized (queryGenBound) {
-            while (!queryGenBound.get()) {
-                queryGenBound.wait();
-            }    
-        }
-    }
-
-    private static void receiveReached(Socket conn) throws IOException {
-        int msg = new DataInputStream(conn.getInputStream()).readInt();
-        OrchestratorDGProtocol msgType = OrchestratorDGProtocol.values()[msg];
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Received " + msgType + " from " + conn.getRemoteSocketAddress());
-        }
-        if (msgType != OrchestratorDGProtocol.REACHED) {
-            throw new IllegalStateException("Encounted unknown message type " + msgType);
-        }
-    }
-
-    private void sendResume(Socket s) throws IOException {
-        new DataOutputStream(s.getOutputStream()).writeInt(OrchestratorDGProtocol.RESUME.ordinal());
-        s.getOutputStream().flush();
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Sent " + OrchestratorDGProtocol.RESUME + " to " + s.getRemoteSocketAddress());
-        }
-    }
-
-    public synchronized void awaitFinished() throws InterruptedException {
-        while (running.get()) {
-            wait();
-        }
-    }
-
-}