You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by im...@apache.org on 2017/11/30 04:55:54 UTC
[09/14] asterixdb git commit: [NO ISSUE] Delete asterix-experiments
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0e21afa7/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/LSMExperimentSetRunner.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/LSMExperimentSetRunner.java b/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/LSMExperimentSetRunner.java
deleted file mode 100644
index 45073d9..0000000
--- a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/LSMExperimentSetRunner.java
+++ /dev/null
@@ -1,306 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.experiment.client;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-import java.util.regex.Pattern;
-
-import org.apache.asterix.experiment.action.base.SequentialActionList;
-import org.apache.asterix.experiment.builder.AbstractExperimentBuilder;
-import org.apache.asterix.experiment.builder.PerfTestAggBuilder;
-import org.apache.asterix.experiment.builder.PresetClusterPerfBuilder;
-import org.kohsuke.args4j.CmdLineException;
-import org.kohsuke.args4j.CmdLineParser;
-import org.kohsuke.args4j.Option;
-
-public class LSMExperimentSetRunner {
-
- private static final Logger LOGGER = Logger.getLogger(LSMExperimentSetRunner.class.getName());
-
- public static class LSMExperimentSetRunnerConfig {
-
- private final String logDirSuffix;
-
- private final int nQueryRuns;
-
- public LSMExperimentSetRunnerConfig(String logDirSuffix, int nQueryRuns) {
- this.logDirSuffix = logDirSuffix;
- this.nQueryRuns = nQueryRuns;
- }
-
- public String getLogDirSuffix() {
- return logDirSuffix;
- }
-
- public int getNQueryRuns() {
- return nQueryRuns;
- }
-
- @Option(name = "-rh", aliases = "--rest-host", usage = "Asterix REST API host address", required = true, metaVar = "HOST")
- private String restHost;
-
- public String getRESTHost() {
- return restHost;
- }
-
- @Option(name = "-rp", aliases = "--rest-port", usage = "Asterix REST API port", required = true, metaVar = "PORT")
- private int restPort;
-
- public int getRESTPort() {
- return restPort;
- }
-
- @Option(name = "-mh", aliases = "--managix-home", usage = "Path to MANAGIX_HOME directory", required = true, metaVar = "MGXHOME")
- private String managixHome;
-
- public String getManagixHome() {
- return managixHome;
- }
-
- @Option(name = "-jh", aliases = "--java-home", usage = "Path to JAVA_HOME directory", required = true, metaVar = "JAVAHOME")
- private String javaHome;
-
- public String getJavaHome() {
- return javaHome;
- }
-
- @Option(name = "-ler", aliases = "--local-experiment-root", usage = "Path to the local LSM experiment root directory", required = true, metaVar = "LOCALEXPROOT")
- private String localExperimentRoot;
-
- public String getLocalExperimentRoot() {
- return localExperimentRoot;
- }
-
- @Option(name = "-u", aliases = "--username", usage = "Username to use for SSH/SCP", required = true, metaVar = "UNAME")
- private String username;
-
- public String getUsername() {
- return username;
- }
-
- @Option(name = "-k", aliases = "--key", usage = "SSH key location", metaVar = "SSHKEY")
- private String sshKeyLocation;
-
- public String getSSHKeyLocation() {
- return sshKeyLocation;
- }
-
- @Option(name = "-d", aliases = "--datagen-duration", usage = "Data generation duration in seconds", metaVar = "DATAGENDURATION")
- private int duration;
-
- public int getDuration() {
- return duration;
- }
-
- @Option(name = "-qd", aliases = "--querygen-duration", usage = "Query generation duration in seconds", metaVar = "QUERYGENDURATION")
- private int queryDuration;
-
- public int getQueryDuration() {
- return queryDuration;
- }
-
- @Option(name = "-regex", aliases = "--regex", usage = "Regular expression used to match experiment names", metaVar = "REGEXP")
- private String regex;
-
- public String getRegex() {
- return regex;
- }
-
- @Option(name = "-oh", aliases = "--orchestrator-host", usage = "The host address of THIS orchestrator")
- private String orchHost;
-
- public String getOrchestratorHost() {
- return orchHost;
- }
-
- @Option(name = "-op", aliases = "--orchestrator-port", usage = "The port to be used for the orchestrator server of THIS orchestrator")
- private int orchPort;
-
- public int getOrchestratorPort() {
- return orchPort;
- }
-
- @Option(name = "-qoh", aliases = "--query-orchestrator-host", usage = "The host address of query orchestrator")
- private String queryOrchHost;
-
- public String getQueryOrchestratorHost() {
- return queryOrchHost;
- }
-
- @Option(name = "-qop", aliases = "--query-orchestrator-port", usage = "The port to be used for the orchestrator server of query orchestrator")
- private int queryOrchPort;
-
- public int getQueryOrchestratorPort() {
- return queryOrchPort;
- }
-
- @Option(name = "-di", aliases = "--data-interval", usage = " Initial data interval to use when generating data for exp 7")
- private long dataInterval;
-
- public long getDataInterval() {
- return dataInterval;
- }
-
- @Option(name = "-ni", aliases = "--num-data-intervals", usage = "Number of data intervals to use when generating data for exp 7")
- private int numIntervals;
-
- public int getNIntervals() {
- return numIntervals;
- }
-
- @Option(name = "-sf", aliases = "--stat-file", usage = "Enable IO/CPU stats and place in specified file")
- private String statFile = null;
-
- public String getStatFile() {
- return statFile;
- }
-
- @Option(name = "-of", aliases = "--openstreetmap-filepath", usage = "The open street map gps point data file path")
- private String openStreetMapFilePath;
-
- public String getOpenStreetMapFilePath() {
- return openStreetMapFilePath;
- }
-
- @Option(name = "-si", aliases = "--location-sample-interval", usage = "Location sample interval from open street map point data")
- private int locationSampleInterval;
-
- public int getLocationSampleInterval() {
- return locationSampleInterval;
- }
-
- @Option(name = "-qsf", aliases = "--query-seed-filepath", usage = "The query seed file path")
- private String querySeedFilePath;
-
- public String getQuerySeedFilePath() {
- return querySeedFilePath;
- }
-
- @Option(name = "-rcbi", aliases = "--record-count-per-batch-during-ingestion-only", usage = "Record count per batch during ingestion only")
- private int recordCountPerBatchDuringIngestionOnly = 1000;
-
- public int getRecordCountPerBatchDuringIngestionOnly() {
- return recordCountPerBatchDuringIngestionOnly;
- }
-
- @Option(name = "-rcbq", aliases = "--record-count-per-batch-during-query", usage = "Record count per batch during query")
- private int recordCountPerBatchDuringQuery = 1000;
-
- public int getRecordCountPerBatchDuringQuery() {
- return recordCountPerBatchDuringQuery;
- }
-
- @Option(name = "-dsti", aliases = "--data-gen-sleep-time-during-ingestion-only", usage = "DataGen sleep time in milliseconds after every recordCountPerBatchDuringIngestionOnly records were sent")
- private long dataGenSleepTimeDuringIngestionOnly = 1;
-
- public long getDataGenSleepTimeDuringIngestionOnly() {
- return dataGenSleepTimeDuringIngestionOnly;
- }
-
- @Option(name = "-dstq", aliases = "--data-gen-sleep-time-during-query", usage = "DataGen sleep time in milliseconds after every recordCountPerBatchDuringQuery records were sent")
- private long dataGenSleepTimeDuringQuery = 1;
-
- public long getDataGenSleepTimeDuringQuery() {
- return dataGenSleepTimeDuringQuery;
- }
- }
-
- public static void main(String[] args) throws Exception {
- // LogManager.getRootLogger().setLevel(org.apache.log4j.Level.OFF);
- LSMExperimentSetRunnerConfig config = new LSMExperimentSetRunnerConfig(String.valueOf(System
- .currentTimeMillis()), 3);
- CmdLineParser clp = new CmdLineParser(config);
- try {
- clp.parseArgument(args);
- } catch (CmdLineException e) {
- System.err.println(e.getMessage());
- clp.printUsage(System.err);
- System.exit(1);
- }
-
- Collection<AbstractExperimentBuilder> suite = new ArrayList<>();
-
- /*
- suite.add(new Experiment7BBuilder(config));
- suite.add(new Experiment7DBuilder(config));
- suite.add(new Experiment7ABuilder(config));
- suite.add(new Experiment8DBuilder(config));
- suite.add(new Experiment8ABuilder(config));
- suite.add(new Experiment8BBuilder(config));
- suite.add(new Experiment9ABuilder(config));
- suite.add(new Experiment9DBuilder(config));
- suite.add(new Experiment9BBuilder(config));
- suite.add(new Experiment6ABuilder(config));
- suite.add(new Experiment6BBuilder(config));
- suite.add(new Experiment6CBuilder(config));
- suite.add(new Experiment2D1Builder(config));
- suite.add(new Experiment2D2Builder(config));
- suite.add(new Experiment2D4Builder(config));
- suite.add(new Experiment2D8Builder(config));
- suite.add(new Experiment2C1Builder(config));
- suite.add(new Experiment2C2Builder(config));
- suite.add(new Experiment2C4Builder(config));
- suite.add(new Experiment2C8Builder(config));
- suite.add(new Experiment2A1Builder(config));
- suite.add(new Experiment2A2Builder(config));
- suite.add(new Experiment2A4Builder(config));
- suite.add(new Experiment2A8Builder(config));
- suite.add(new Experiment2B1Builder(config));
- suite.add(new Experiment2B2Builder(config));
- suite.add(new Experiment2B4Builder(config));
- suite.add(new Experiment2B8Builder(config));
- suite.add(new Experiment1ABuilder(config));
- suite.add(new Experiment1BBuilder(config));
- suite.add(new Experiment1CBuilder(config));
- suite.add(new Experiment1DBuilder(config));
- suite.add(new Experiment1EBuilder(config));
- suite.add(new Experiment4ABuilder(config));
- suite.add(new Experiment4BBuilder(config));
- suite.add(new Experiment4CBuilder(config));
- suite.add(new Experiment4DBuilder(config));
- suite.add(new Experiment3ABuilder(config));
- suite.add(new Experiment3BBuilder(config));
- suite.add(new Experiment3CBuilder(config));
- suite.add(new Experiment3DBuilder(config));
- suite.add(new Experiment5ABuilder(config));
- suite.add(new Experiment5BBuilder(config));
- suite.add(new Experiment5CBuilder(config));
- suite.add(new Experiment5DBuilder(config));
- */
- suite.add(new PerfTestAggBuilder(config));
- suite.add(new PresetClusterPerfBuilder(config));
-
- Pattern p = config.getRegex() == null ? null : Pattern.compile(config.getRegex());
-
- SequentialActionList exps = new SequentialActionList();
- for (AbstractExperimentBuilder eb : suite) {
- if (p == null || p.matcher(eb.getName()).matches()) {
- exps.add(eb.build());
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Added " + eb.getName() + " to run list...");
- }
- }
- }
- exps.perform();
- }
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0e21afa7/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/LSMPerfConstants.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/LSMPerfConstants.java b/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/LSMPerfConstants.java
deleted file mode 100644
index 78483d1..0000000
--- a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/LSMPerfConstants.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.experiment.client;
-
-public class LSMPerfConstants {
-
- private LSMPerfConstants(){
- throw new UnsupportedOperationException();
- } // never needs to be instantiated
-
- public static final String CONFIG_DIR = "configs";
-
- public static final String AQL_DIR = "aql";
-
- public static final String BASE_DIR = "base";
-
- public static final String DGEN_DIR = "dgen";
-
- public static final String LOG_DIR = "log";
-
- public static final String BASE_TYPES = "base/perf_types.aql";
-
- public static final String RESULT_FILE = "agg_results.csv";
-
- public static final String ASTERIX_CONFIGURATION = "asterix-configuration.xml";
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0e21afa7/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/OrchestratorDGProtocol.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/OrchestratorDGProtocol.java b/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/OrchestratorDGProtocol.java
deleted file mode 100644
index a29a74c..0000000
--- a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/OrchestratorDGProtocol.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.experiment.client;
-
-public enum OrchestratorDGProtocol {
- STOPPED,
- RESUME,
- REACHED
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0e21afa7/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/OrchestratorServer.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/OrchestratorServer.java b/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/OrchestratorServer.java
deleted file mode 100644
index a69b0ce..0000000
--- a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/OrchestratorServer.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.experiment.client;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.experiment.action.base.IAction;
-
-public class OrchestratorServer {
-
- private static final Logger LOGGER = Logger.getLogger(OrchestratorServer.class.getName());
-
- private final int port;
-
- private final int nDataGens;
-
- private final int nIntervals;
-
- private final AtomicBoolean running;
-
- private final IAction[] protocolActions;
-
- private final boolean flagStopResume;
-
- public OrchestratorServer(int port, int nDataGens, int nIntervals, IAction[] protocolActions) {
- this.port = port;
- this.nDataGens = nDataGens;
- this.nIntervals = nIntervals;
- running = new AtomicBoolean();
- this.protocolActions = protocolActions;
- this.flagStopResume = true;
- }
-
- public synchronized void start() throws IOException, InterruptedException {
- final AtomicBoolean bound = new AtomicBoolean();
- running.set(true);
- Thread t = new Thread(new Runnable() {
-
- @Override
- public void run() {
- try {
- ServerSocket ss = new ServerSocket(port);
- synchronized (bound) {
- bound.set(true);
- bound.notifyAll();
- }
- Socket[] conn = new Socket[nDataGens];
- try {
- for (int i = 0; i < nDataGens; i++) {
- conn[i] = ss.accept();
- }
- for (int n = 0; n < nIntervals; ++n) {
- //TODO refactor operations according to the protocol message
- if (flagStopResume) {
- for (int i = 0; i < nDataGens; i++) {
- receiveStopped(conn[i]);
- }
- protocolActions[n].perform();
- if (n != nIntervals - 1) {
- for (int i = 0; i < nDataGens; i++) {
- sendResume(conn[i]);
- }
- }
- } else {
- for (int i = 0; i < nDataGens; i++) {
- receiveReached(conn[i]);
- }
- protocolActions[n].perform();
- }
- }
- } finally {
- for (int i = 0; i < conn.length; ++i) {
- if (conn[i] != null) {
- conn[i].close();
- }
- }
- ss.close();
- }
- running.set(false);
- synchronized (OrchestratorServer.this) {
- OrchestratorServer.this.notifyAll();
- }
- } catch (Throwable t) {
- t.printStackTrace();
- }
- }
-
- });
- t.start();
- synchronized (bound) {
- while (!bound.get()) {
- bound.wait();
- }
- }
- }
-
- private void sendResume(Socket conn) throws IOException {
- new DataOutputStream(conn.getOutputStream()).writeInt(OrchestratorDGProtocol.RESUME.ordinal());
- conn.getOutputStream().flush();
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Sent " + OrchestratorDGProtocol.RESUME + " to " + conn.getRemoteSocketAddress());
- }
- }
-
- private void receiveStopped(Socket conn) throws IOException {
- int msg = new DataInputStream(conn.getInputStream()).readInt();
- OrchestratorDGProtocol msgType = OrchestratorDGProtocol.values()[msg];
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Received " + msgType + " from " + conn.getRemoteSocketAddress());
- }
- if (msgType != OrchestratorDGProtocol.STOPPED) {
- throw new IllegalStateException("Encounted unknown message type " + msgType);
- }
- }
-
- private void receiveReached(Socket conn) throws IOException {
- int msg = new DataInputStream(conn.getInputStream()).readInt();
- OrchestratorDGProtocol msgType = OrchestratorDGProtocol.values()[msg];
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Received " + msgType + " from " + conn.getRemoteSocketAddress());
- }
- if (msgType != OrchestratorDGProtocol.REACHED) {
- throw new IllegalStateException("Encounted unknown message type " + msgType);
- }
-
- }
-
- public synchronized void awaitFinished() throws InterruptedException {
- while (running.get()) {
- wait();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0e21afa7/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/OrchestratorServer7.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/OrchestratorServer7.java b/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/OrchestratorServer7.java
deleted file mode 100644
index c547393..0000000
--- a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/OrchestratorServer7.java
+++ /dev/null
@@ -1,238 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.experiment.client;
-
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.experiment.action.base.IAction;
-import org.apache.asterix.experiment.builder.AbstractExperiment7Builder;
-
-public class OrchestratorServer7 {
-
- private static final Logger LOGGER = Logger.getLogger(OrchestratorServer7.class.getName());
-
- private final int port;
-
- private final int nDataGens;
-
- private final int nIntervals;
-
- private final AtomicBoolean running;
-
- private final IProtocolActionBuilder protoActionBuilder;
-
- private final IAction lsAction;
-
- private static final int QUERY_TOTAL_COUNT = 2000;
-
- public OrchestratorServer7(int port, int nDataGens, int nIntervals, IProtocolActionBuilder protoActionBuilder, IAction lsAction) {
- this.port = port;
- this.nDataGens = nDataGens;
- this.nIntervals = nIntervals;
- running = new AtomicBoolean();
- this.protoActionBuilder = protoActionBuilder;
- this.lsAction = lsAction;
- }
-
- public synchronized void start() throws IOException, InterruptedException {
- final AtomicBoolean bound = new AtomicBoolean();
- running.set(true);
- Thread t = new Thread(new Runnable() {
-
- @Override
- public void run() {
- try {
- ServerSocket ss = new ServerSocket(port);
- synchronized (bound) {
- bound.set(true);
- bound.notifyAll();
- }
- Socket[] conn = new Socket[nDataGens];
- try {
- for (int i = 0; i < nDataGens; i++) {
- conn[i] = ss.accept();
- }
- AtomicInteger round = new AtomicInteger();
- AtomicBoolean done = new AtomicBoolean(false);
- Thread pct = new Thread(new ProtocolConsumer(conn, nIntervals, round, done));
- pct.start();
- int[] queryType = new int[] { 10, 100, 1000, 10000 };
- int type = 0;
- //step1. send query when it reaches the query begin round
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Step1 starts");
- }
- boolean sendQuery = false;
- while (!done.get()) {
- if (!sendQuery) {
- synchronized (round) {
- while (true) {
- if (round.get() >= AbstractExperiment7Builder.QUERY_BEGIN_ROUND) {
- sendQuery = true;
- break;
- }
- round.wait();
- }
- }
- }
- if (sendQuery) {
- protoActionBuilder.buildQueryAction(queryType[type % 4], false).perform();
- type = (++type) % 4;
- }
-
- }
- pct.join();
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Step1 ends");
- }
-
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Step2 starts");
- }
- //step2. send one more round of queries after ingestion is over
- protoActionBuilder.buildIOWaitAction().perform();
- lsAction.perform();
- for (int i = 0; i < QUERY_TOTAL_COUNT; i++) {
- protoActionBuilder.buildQueryAction(queryType[i % 4], true).perform();
- }
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Step2 ends");
- }
-
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Step3 starts");
- }
- //step3. compact dataset
- protoActionBuilder.buildCompactAction().perform();
- protoActionBuilder.buildIOWaitAction().perform();
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Step3 ends");
- }
-
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Step4 starts");
- }
- //step4. send last round of queries after the compaction is over
- for (int i = 0; i < QUERY_TOTAL_COUNT; i++) {
- protoActionBuilder.buildQueryAction(queryType[i % 4], true).perform();
- }
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Step4 ends");
- }
-
- } finally {
- for (int i = 0; i < conn.length; ++i) {
- if (conn[i] != null) {
- conn[i].close();
- }
- }
- ss.close();
- }
- running.set(false);
- synchronized (OrchestratorServer7.this) {
- OrchestratorServer7.this.notifyAll();
- }
- } catch (Throwable t) {
- t.printStackTrace();
- }
- }
-
- });
- t.start();
- synchronized (bound) {
- while (!bound.get()) {
- bound.wait();
- }
- }
- }
-
- private static class ProtocolConsumer implements Runnable {
-
- private final Socket[] conn;
-
- private final int nIntervals;
-
- private final AtomicInteger interval;
-
- private final AtomicBoolean done;
-
- public ProtocolConsumer(Socket[] conn, int nIntervals, AtomicInteger interval, AtomicBoolean done) {
- this.conn = conn;
- this.nIntervals = nIntervals;
- this.interval = interval;
- this.done = done;
- }
-
- @Override
- public void run() {
- interval.set(0);
- try {
- for (int n = 0; n < nIntervals; ++n) {
- for (int i = 0; i < conn.length; i++) {
- receiveReached(conn[i]);
- }
- synchronized (interval) {
- interval.getAndIncrement();
- interval.notifyAll();
- }
- }
- done.set(true);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- }
-
- private static void receiveReached(Socket conn) throws IOException {
- int msg = new DataInputStream(conn.getInputStream()).readInt();
- OrchestratorDGProtocol msgType = OrchestratorDGProtocol.values()[msg];
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Received " + msgType + " from " + conn.getRemoteSocketAddress());
- }
- if (msgType != OrchestratorDGProtocol.REACHED) {
- throw new IllegalStateException("Encounted unknown message type " + msgType);
- }
-
- }
-
- public synchronized void awaitFinished() throws InterruptedException {
- while (running.get()) {
- wait();
- }
- }
-
- public interface IProtocolActionBuilder {
- public IAction buildQueryAction(long cardinality, boolean finalRound) throws Exception;
-
- public IAction buildIOWaitAction() throws Exception;
-
- public IAction buildCompactAction() throws Exception;
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0e21afa7/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/OrchestratorServer9.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/OrchestratorServer9.java b/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/OrchestratorServer9.java
deleted file mode 100644
index be50e7c..0000000
--- a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/OrchestratorServer9.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.experiment.client;
-
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.experiment.action.base.IAction;
-
-public class OrchestratorServer9 {
-
- private static final Logger LOGGER = Logger.getLogger(OrchestratorServer9.class.getName());
-
- private final int port;
-
- private final int nDataGens;
-
- private final int nIntervals;
-
- private final AtomicBoolean running;
-
- private final IProtocolActionBuilder protoActionBuilder;
-
- public OrchestratorServer9(int port, int nDataGens, int nIntervals, IProtocolActionBuilder protoActionBuilder) {
- this.port = port;
- this.nDataGens = nDataGens;
- this.nIntervals = nIntervals;
- running = new AtomicBoolean();
- this.protoActionBuilder = protoActionBuilder;
- }
-
- public synchronized void start() throws IOException, InterruptedException {
- final AtomicBoolean bound = new AtomicBoolean();
- running.set(true);
- Thread t = new Thread(new Runnable() {
-
- @Override
- public void run() {
- try {
- ServerSocket ss = new ServerSocket(port);
- synchronized (bound) {
- bound.set(true);
- bound.notifyAll();
- }
- Socket[] conn = new Socket[nDataGens];
- try {
- for (int i = 0; i < nDataGens; i++) {
- conn[i] = ss.accept();
- }
- AtomicInteger round = new AtomicInteger();
- AtomicBoolean done = new AtomicBoolean(false);
- Thread pct = new Thread(new ProtocolConsumer(conn, nIntervals, round, done));
- pct.start();
- while (!done.get()) {
- protoActionBuilder.buildAction(round.get()).perform();
- }
- pct.join();
- } finally {
- for (int i = 0; i < conn.length; ++i) {
- if (conn[i] != null) {
- conn[i].close();
- }
- }
- ss.close();
- }
- running.set(false);
- synchronized (OrchestratorServer9.this) {
- OrchestratorServer9.this.notifyAll();
- }
- } catch (Throwable t) {
- t.printStackTrace();
- }
- }
-
- });
- t.start();
- synchronized (bound) {
- while (!bound.get()) {
- bound.wait();
- }
- }
- }
-
- private static class ProtocolConsumer implements Runnable {
-
- private final Socket[] conn;
-
- private final int nIntervals;
-
- private final AtomicInteger interval;
-
- private final AtomicBoolean done;
-
- public ProtocolConsumer(Socket[] conn, int nIntervals, AtomicInteger interval, AtomicBoolean done) {
- this.conn = conn;
- this.nIntervals = nIntervals;
- this.interval = interval;
- this.done = done;
- }
-
- @Override
- public void run() {
- interval.set(0);
- try {
- for (int n = 0; n < nIntervals; ++n) {
- for (int i = 0; i < conn.length; i++) {
- receiveReached(conn[i]);
- }
- interval.getAndIncrement();
- }
- done.set(true);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- }
-
- private static void receiveReached(Socket conn) throws IOException {
- int msg = new DataInputStream(conn.getInputStream()).readInt();
- OrchestratorDGProtocol msgType = OrchestratorDGProtocol.values()[msg];
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Received " + msgType + " from " + conn.getRemoteSocketAddress());
- }
- if (msgType != OrchestratorDGProtocol.REACHED) {
- throw new IllegalStateException("Encounted unknown message type " + msgType);
- }
-
- }
-
- public synchronized void awaitFinished() throws InterruptedException {
- while (running.get()) {
- wait();
- }
- }
-
- public interface IProtocolActionBuilder {
- public IAction buildAction(int round) throws Exception;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0e21afa7/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/RecordCountingServer.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/RecordCountingServer.java b/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/RecordCountingServer.java
deleted file mode 100644
index 22e5ac0..0000000
--- a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/RecordCountingServer.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.experiment.client;
-
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.Reader;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-public class RecordCountingServer {
-
- private final ExecutorService threadPool;
-
- private final int port;
-
- private final long duration;
-
- private ServerSocket ss;
-
- private boolean stopped;
-
- private final Object o = new Object();
-
- final AtomicBoolean b = new AtomicBoolean(false);
-
- public RecordCountingServer(int port, long duration) {
- this.port = port;
- this.duration = duration;
- threadPool = Executors.newCachedThreadPool();
- }
-
- public void start() throws IOException, InterruptedException {
- Thread t = new Thread(new Runnable() {
-
- @Override
- public void run() {
- try {
- stopped = false;
- ss = new ServerSocket(port);
- while (true) {
- Socket s = ss.accept();
- if (stopped) {
- break;
- }
- threadPool.execute(new RecordCountingThread(s, duration));
- synchronized (o) {
- b.set(true);
- o.notifyAll();
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- });
- t.start();
- }
-
- public void awaitFirstConnection() throws InterruptedException {
- synchronized (o) {
- if (!b.get()) {
- o.wait();
- }
- }
- }
-
- public void stop() throws IOException, InterruptedException {
- stopped = true;
- threadPool.shutdown();
- threadPool.awaitTermination(1000, TimeUnit.DAYS);
- ss.close();
- }
-
- private static class RecordCountingThread implements Runnable {
- private final Socket s;
-
- private final long duration;
-
- private final char[] buf;
-
- private int index;
-
- private int count;
-
- public RecordCountingThread(Socket s, long duration) {
- this.s = s;
- this.duration = duration;
- buf = new char[32 * 1024];
- }
-
- @Override
- public void run() {
- count = 0;
- index = 0;
- long start = System.currentTimeMillis();
- try {
- InputStreamReader r = new InputStreamReader(s.getInputStream());
- while (System.currentTimeMillis() - start < duration) {
- fill(r);
- countRecords();
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- long end = System.currentTimeMillis();
- System.out.println("Read " + count + " records in " + (end - start) / 1000 + " seconds");
- }
-
- private void countRecords() {
- for (int i = 0; i < index; ++i) {
- if (buf[i] == '\n') {
- ++count;
- }
- }
- }
-
- private void fill(Reader r) throws IOException {
- index = 0;
- int read = r.read(buf);
- if (read == -1) {
- index = 0;
- return;
- }
- index += read;
- }
- }
-
- public static void main(String[] args) throws Exception {
- long duration = Long.parseLong(args[0]);
- int port1 = Integer.parseInt(args[1]);
- int port2 = Integer.parseInt(args[2]);
- RecordCountingServer rcs1 = new RecordCountingServer(port1, duration * 1000);
- RecordCountingServer rcs2 = new RecordCountingServer(port2, duration * 1000);
- try {
- rcs1.start();
- rcs2.start();
- rcs1.awaitFirstConnection();
- rcs2.awaitFirstConnection();
- } finally {
- rcs1.stop();
- rcs2.stop();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0e21afa7/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/SocketDataGeneratorExecutable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/SocketDataGeneratorExecutable.java b/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/SocketDataGeneratorExecutable.java
deleted file mode 100644
index eeac0b4..0000000
--- a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/SocketDataGeneratorExecutable.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.experiment.client;
-
-import java.net.Socket;
-import java.util.Collections;
-
-import org.apache.asterix.experiment.action.base.AbstractAction;
-import org.apache.asterix.tools.external.data.TweetGeneratorForSpatialIndexEvaluation;
-
-public class SocketDataGeneratorExecutable extends AbstractAction {
-
- private final String adapterHost;
-
- private final int adapterPort;
-
- public SocketDataGeneratorExecutable(String adapterHost, int adapterPort) {
- this.adapterHost = adapterHost;
- this.adapterPort = adapterPort;
- }
-
- @Override
- protected void doPerform() throws Exception {
- Thread.sleep(4000);
- Socket s = new Socket(adapterHost, adapterPort);
- try {
- TweetGeneratorForSpatialIndexEvaluation tg = new TweetGeneratorForSpatialIndexEvaluation(Collections.<String, String> emptyMap(), 0,
- TweetGeneratorForSpatialIndexEvaluation.OUTPUT_FORMAT_ADM_STRING, s.getOutputStream());
- long start = System.currentTimeMillis();
- while (tg.setNextRecordBatch(1000)) {
- }
- long end = System.currentTimeMillis();
- long total = end - start;
- System.out.println("Generation finished: " + tg.getNumFlushedTweets() + " in " + total / 1000 + " seconds");
- } finally {
- s.close();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0e21afa7/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/SocketTweetGenerator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/SocketTweetGenerator.java b/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/SocketTweetGenerator.java
deleted file mode 100644
index f817fc9..0000000
--- a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/SocketTweetGenerator.java
+++ /dev/null
@@ -1,389 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.experiment.client;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.InetAddress;
-import java.net.Socket;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.tools.external.data.TweetGeneratorForSpatialIndexEvaluation;
-import org.apache.commons.lang3.tuple.Pair;
-
-public class SocketTweetGenerator {
-
- private final ExecutorService threadPool;
-
- private final int partitionRangeStart;
-
- private final int dataGenDuration;
-
- private final int queryGenDuration;
-
- private final long startDataInterval;
-
- private final int nDataIntervals;
-
- private final String orchHost;
-
- private final int orchPort;
-
- private final List<Pair<String, Integer>> receiverAddresses;
-
- private final String openStreetMapFilePath;
- private final int locationSampleInterval;
- private final int recordCountPerBatchDuringIngestionOnly;
- private final int recordCountPerBatchDuringQuery;
- private final long dataGenSleepTimeDuringIngestionOnly;
- private final long dataGenSleepTimeDuringQuery;
-
- private final Mode mode;
-
- private enum Mode {
- TIME,
- DATA
- }
-
- public SocketTweetGenerator(SocketTweetGeneratorConfig config) {
- threadPool = Executors.newCachedThreadPool(new ThreadFactory() {
-
- private final AtomicInteger count = new AtomicInteger();
-
- @Override
- public Thread newThread(Runnable r) {
- int tid = count.getAndIncrement();
- Thread t = new Thread(r, "DataGeneratorThread: " + tid);
- t.setDaemon(true);
- return t;
- }
- });
-
- partitionRangeStart = config.getPartitionRangeStart();
- dataGenDuration = config.getDataGenDuration();
- queryGenDuration = config.getQueryGenDuration();
- startDataInterval = config.getDataInterval();
- nDataIntervals = config.getNIntervals();
- orchHost = config.getOrchestratorHost();
- orchPort = config.getOrchestratorPort();
- receiverAddresses = config.getAddresses();
- mode = startDataInterval > 0 ? Mode.DATA : Mode.TIME;
- openStreetMapFilePath = config.getOpenStreetMapFilePath();
- locationSampleInterval = config.getLocationSampleInterval();
- recordCountPerBatchDuringIngestionOnly = config.getRecordCountPerBatchDuringIngestionOnly();
- recordCountPerBatchDuringQuery = config.getRecordCountPerBatchDuringQuery();
- dataGenSleepTimeDuringIngestionOnly = config.getDataGenSleepTimeDuringIngestionOnly();
- dataGenSleepTimeDuringQuery = config.getDataGenSleepTimeDuringQuery();
- }
-
- public void start() throws Exception {
- final Semaphore sem = new Semaphore((receiverAddresses.size() - 1) * -1);
- int i = 0;
- for (Pair<String, Integer> address : receiverAddresses) {
- threadPool.submit(new DataGenerator(mode, sem, address.getLeft(), address.getRight(), i
- + partitionRangeStart, dataGenDuration, queryGenDuration, nDataIntervals, startDataInterval,
- orchHost, orchPort, openStreetMapFilePath, locationSampleInterval,
- recordCountPerBatchDuringIngestionOnly, recordCountPerBatchDuringQuery,
- dataGenSleepTimeDuringIngestionOnly, dataGenSleepTimeDuringQuery));
- ++i;
- }
- sem.acquire();
- }
-
- public static class DataGenerator implements Runnable {
-
- private static final Logger LOGGER = Logger.getLogger(DataGenerator.class.getName());
-
- private final Mode m;
- private final Semaphore sem;
- private final String host;
- private final int port;
- private final int partition;
- private final int dataGenDuration;
- private final int queryGenDuration;
- private final int nDataIntervals;
- private final String orchHost;
- private final int orchPort;
-
- private int currentInterval;
- private long nextStopInterval;
- private final long dataSizeInterval;
- private final boolean flagStopResume;
- private final String openStreetMapFilePath;
- private final int locationSampleInterval;
- private final int recordCountPerBatchDuringIngestionOnly;
- private final int recordCountPerBatchDuringQuery;
- private final long dataGenSleepTimeDuringIngestionOnly;
- private final long dataGenSleepTimeDuringQuery;
-
- public DataGenerator(Mode m, Semaphore sem, String host, int port, int partition, int dataGenDuration,
- int queryGenDuration, int nDataIntervals, long dataSizeInterval, String orchHost, int orchPort,
- String openStreetMapFilePath, int locationSampleInterval, int recordCountPerBatchDuringIngestionOnly,
- int recordCountPerBatchDuringQuery, long dataGenSleepTimeDuringIngestionOnly,
- long dataGenSleepTimeDuringQuery) {
- this.m = m;
- this.sem = sem;
- this.host = host;
- this.port = port;
- this.partition = partition;
- this.dataGenDuration = dataGenDuration;
- this.queryGenDuration = queryGenDuration;
- this.nDataIntervals = nDataIntervals;
- currentInterval = 0;
- this.dataSizeInterval = dataSizeInterval;
- this.nextStopInterval = dataSizeInterval;
- this.orchHost = orchHost;
- this.orchPort = orchPort;
- this.flagStopResume = false;
- this.openStreetMapFilePath = openStreetMapFilePath;
- //simple heuristic to generate different data from different data generator.
- int lsi = locationSampleInterval + (partition + 1) * (partition <= 4 ? 7 : 9);
- this.locationSampleInterval = lsi;
- this.recordCountPerBatchDuringIngestionOnly = recordCountPerBatchDuringIngestionOnly;
- this.recordCountPerBatchDuringQuery = recordCountPerBatchDuringQuery;
- this.dataGenSleepTimeDuringIngestionOnly = dataGenSleepTimeDuringIngestionOnly;
- this.dataGenSleepTimeDuringQuery = dataGenSleepTimeDuringQuery;
- }
-
- @Override
- public void run() {
- LOGGER.info("\nDataGen[" + partition + "] running with the following parameters: \n" + "dataGenDuration : "
- + dataGenDuration + "\n" + "queryGenDuration : " + queryGenDuration + "\n" + "nDataIntervals : "
- + nDataIntervals + "\n" + "dataSizeInterval : " + dataSizeInterval + "\n"
- + "recordCountPerBatchDuringIngestionOnly : " + recordCountPerBatchDuringIngestionOnly + "\n"
- + "recordCountPerBatchDuringQuery : " + recordCountPerBatchDuringQuery + "\n"
- + "dataGenSleepTimeDuringIngestionOnly : " + dataGenSleepTimeDuringIngestionOnly + "\n"
- + "dataGenSleepTimeDuringQuery : " + dataGenSleepTimeDuringQuery + "\n"
- + "locationSampleInterval : " + locationSampleInterval);
-
- try {
- Socket s = new Socket(host, port);
- try {
- Socket orchSocket = null;
- if (m == Mode.DATA && orchHost != null) {
- orchSocket = new Socket(orchHost, orchPort);
- }
- TweetGeneratorForSpatialIndexEvaluation tg = null;
- try {
- Map<String, String> config = new HashMap<>();
- String durationVal = m == Mode.TIME ? String.valueOf(dataGenDuration) : "0";
- config.put(TweetGeneratorForSpatialIndexEvaluation.KEY_DURATION, String.valueOf(durationVal));
- if (openStreetMapFilePath != null) {
- config.put(TweetGeneratorForSpatialIndexEvaluation.KEY_OPENSTREETMAP_FILEPATH,
- openStreetMapFilePath);
- config.put(TweetGeneratorForSpatialIndexEvaluation.KEY_LOCATION_SAMPLE_INTERVAL,
- String.valueOf(locationSampleInterval));
- }
- tg = new TweetGeneratorForSpatialIndexEvaluation(config, partition,
- TweetGeneratorForSpatialIndexEvaluation.OUTPUT_FORMAT_ADM_STRING, s.getOutputStream());
- long startTS = System.currentTimeMillis();
- long prevTS = startTS;
- long curTS = startTS;
- int round = 0;
- while (tg.setNextRecordBatch(recordCountPerBatchDuringIngestionOnly)) {
- if (m == Mode.DATA) {
- if (tg.getNumFlushedTweets() >= nextStopInterval) {
- //TODO stop/resume option
- if (orchSocket != null) {
- if (flagStopResume) {
- // send stop to orchestrator
- sendStopped(orchSocket);
- } else {
- sendReached(orchSocket);
- }
- }
-
- // update intervals
- // TODO give options: exponential/linear interval
- nextStopInterval += dataSizeInterval;
- if (++currentInterval >= nDataIntervals) {
- break;
- }
-
- if (orchSocket != null) {
- if (flagStopResume) {
- receiveResume(orchSocket);
- }
- }
- }
- }
- curTS = System.currentTimeMillis();
- if (LOGGER.isLoggable(Level.INFO)) {
- round++;
- if ((round * recordCountPerBatchDuringIngestionOnly) % 100000 == 0) {
- System.out.println("DataGen[" + partition
- + "][During ingestion only][TimeToInsert100000] " + (curTS - prevTS)
- + " in milliseconds");
- round = 0;
- prevTS = curTS;
- }
- }
- //to prevent congestion in feed pipe line.
- if (dataGenSleepTimeDuringIngestionOnly > 0) {
- Thread.sleep(dataGenSleepTimeDuringIngestionOnly);
- }
- }
-
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("DataGen[" + partition
- + "][During ingestion only][InsertCount] Num tweets flushed = "
- + tg.getNumFlushedTweets() + " in "
- + ((System.currentTimeMillis() - startTS) / 1000) + " seconds from "
- + InetAddress.getLocalHost() + " to " + host + ":" + port);
- }
-
- if (orchSocket != null && queryGenDuration > 0) {
- //wait until orchestrator server's resume message is received.
- receiveResume(orchSocket);
-
- //reset duration and flushed tweet count
- tg.resetDurationAndFlushedTweetCount(queryGenDuration);
-
- prevTS = System.currentTimeMillis();
- round = 0;
- //start sending record
- while (tg.setNextRecordBatch(recordCountPerBatchDuringQuery)) {
- curTS = System.currentTimeMillis();
- if (LOGGER.isLoggable(Level.INFO)) {
- round++;
- if ((round * recordCountPerBatchDuringQuery) % 100000 == 0) {
- System.out.println("DataGen[" + partition
- + "][During ingestion + queries][TimeToInsert100000] "
- + (curTS - prevTS) + " in milliseconds");
- round = 0;
- prevTS = curTS;
- }
- }
- if (dataGenSleepTimeDuringQuery > 0) {
- Thread.sleep(dataGenSleepTimeDuringQuery);
- }
- }
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("DataGen[" + partition
- + "][During ingestion + queries][InsertCount] Num tweets flushed = "
- + tg.getNumFlushedTweets() + " in " + queryGenDuration + " seconds from "
- + InetAddress.getLocalHost() + " to " + host + ":" + port);
- }
- //send reached message to orchestrator server
- sendReached(orchSocket);
- }
-
- } finally {
- if (orchSocket != null) {
- orchSocket.close();
- }
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Num tweets flushed = " + tg.getNumFlushedTweets() + " in " + dataGenDuration
- + " seconds from " + InetAddress.getLocalHost() + " to " + host + ":" + port);
- }
- }
- } catch (Throwable t) {
- t.printStackTrace();
- } finally {
- s.close();
- }
- } catch (Throwable t) {
- System.err.println("Error connecting to " + host + ":" + port);
- t.printStackTrace();
- } finally {
- sem.release();
- }
- }
-
- private void sendReached(Socket s) throws IOException {
- new DataOutputStream(s.getOutputStream()).writeInt(OrchestratorDGProtocol.REACHED.ordinal());
- s.getOutputStream().flush();
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Sent " + OrchestratorDGProtocol.REACHED + " to " + s.getRemoteSocketAddress());
- }
- }
-
- private void receiveResume(Socket s) throws IOException {
- int msg = new DataInputStream(s.getInputStream()).readInt();
- OrchestratorDGProtocol msgType = OrchestratorDGProtocol.values()[msg];
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Received " + msgType + " from " + s.getRemoteSocketAddress());
- }
- if (msgType != OrchestratorDGProtocol.RESUME) {
- throw new IllegalStateException("Unknown protocol message received: " + msgType);
- }
- }
-
- private void sendStopped(Socket s) throws IOException {
- new DataOutputStream(s.getOutputStream()).writeInt(OrchestratorDGProtocol.STOPPED.ordinal());
- s.getOutputStream().flush();
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Sent " + OrchestratorDGProtocol.STOPPED + " to " + s.getRemoteSocketAddress());
- }
- }
-
- }
-
- private static class CircularByteArrayOutputStream extends OutputStream {
-
- private final byte[] buf;
-
- private int index;
-
- public CircularByteArrayOutputStream() {
- buf = new byte[32 * 1024];
- index = 0;
- }
-
- @Override
- public void write(byte b[], int off, int len) throws IOException {
- if (b == null) {
- throw new NullPointerException();
- } else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) {
- throw new IndexOutOfBoundsException();
- } else if (len == 0) {
- return;
- }
-
- int remain = len;
- int remainOff = off;
- while (remain > 0) {
- int avail = buf.length - index;
- System.arraycopy(b, remainOff, buf, index, avail);
- remainOff += avail;
- remain -= avail;
- index = (index + avail) % buf.length;
- }
- }
-
- @Override
- public void write(int b) throws IOException {
- buf[index] = (byte) b;
- index = (index + 1) % buf.length;
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0e21afa7/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/SocketTweetGeneratorConfig.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/SocketTweetGeneratorConfig.java b/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/SocketTweetGeneratorConfig.java
deleted file mode 100644
index d3bb4bd..0000000
--- a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/SocketTweetGeneratorConfig.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.experiment.client;
-
-import java.util.List;
-
-import org.apache.commons.lang3.tuple.Pair;
-import org.kohsuke.args4j.Argument;
-import org.kohsuke.args4j.CmdLineException;
-import org.kohsuke.args4j.CmdLineParser;
-import org.kohsuke.args4j.Option;
-import org.kohsuke.args4j.OptionDef;
-import org.kohsuke.args4j.spi.OptionHandler;
-import org.kohsuke.args4j.spi.Parameters;
-import org.kohsuke.args4j.spi.Setter;
-
-public class SocketTweetGeneratorConfig {
-
- @Option(name = "-p", aliases = "--partition-range-start", usage = "Starting partition number for the set of data generators (default = 0)")
- private int partitionRangeStart = 0;
-
- public int getPartitionRangeStart() {
- return partitionRangeStart;
- }
-
- @Option(name = "-d", aliases = { "--datagen-duration" }, usage = "Duration in seconds to run data generation")
- private int duration = -1;
-
- public int getDataGenDuration() {
- return duration;
- }
-
- @Option(name = "-qd", aliases = { "--querygen-duration" }, usage = "Duration in seconds to run query generation")
- private int queryDuration = -1;
-
- public int getQueryGenDuration() {
- return queryDuration;
- }
-
- @Option(name = "-di", aliases = "--data-interval", usage = "Initial data interval to use when generating data based on data size")
- private long dataInterval = -1;
-
- public long getDataInterval() {
- return dataInterval;
- }
-
- @Option(name = "-ni", aliases = "--num-intervals", usage = "Number of intervals to use when generating data based on data size (default = 4)")
- private int nIntervals = 4;
-
- public int getNIntervals() {
- return nIntervals;
- }
-
- @Option(name = "-oh", aliases = "--orachestrator-host", usage = "The host name of the orchestrator")
- private String orchHost;
-
- public String getOrchestratorHost() {
- return orchHost;
- }
-
- @Option(name = "-op", aliases = "--orchestrator-port", usage = "The port number of the orchestrator")
- private int orchPort;
-
- public int getOrchestratorPort() {
- return orchPort;
- }
-
- @Option(name = "-of", aliases = "--openstreetmap-filepath", usage = "The open street map gps point data file path")
- private String openStreetMapFilePath;
-
- public String getOpenStreetMapFilePath() {
- return openStreetMapFilePath;
- }
-
- @Option(name = "-si", aliases = "--location-sample-interval", usage = "Location sample interval from open street map point data")
- private int locationSampleInterval;
-
- public int getLocationSampleInterval() {
- return locationSampleInterval;
- }
-
- @Option(name = "-rcbi", aliases = "--record-count-per-batch-during-ingestion-only", usage = "Record count per batch during ingestion only")
- private int recordCountPerBatchDuringIngestionOnly = 1000;
-
- public int getRecordCountPerBatchDuringIngestionOnly() {
- return recordCountPerBatchDuringIngestionOnly;
- }
-
- @Option(name = "-rcbq", aliases = "--record-count-per-batch-during-query", usage = "Record count per batch during query")
- private int recordCountPerBatchDuringQuery = 1000;
-
- public int getRecordCountPerBatchDuringQuery() {
- return recordCountPerBatchDuringQuery;
- }
-
- @Option(name = "-dsti", aliases = "--data-gen-sleep-time-during-ingestion-only", usage = "DataGen sleep time in milliseconds after every recordCountPerBatchDuringIngestionOnly records were sent")
- private long dataGenSleepTimeDuringIngestionOnly = 1;
-
- public long getDataGenSleepTimeDuringIngestionOnly() {
- return dataGenSleepTimeDuringIngestionOnly;
- }
-
- @Option(name = "-dstq", aliases = "--data-gen-sleep-time-during-query", usage = "DataGen sleep time in milliseconds after every recordCountPerBatchDuringQuery records were sent")
- private long dataGenSleepTimeDuringQuery = 1;
-
- public long getDataGenSleepTimeDuringQuery() {
- return dataGenSleepTimeDuringQuery;
- }
-
- @Argument(required = true, usage = "A list of <ip>:<port> pairs (addresses) to send data to", metaVar = "addresses...", handler = AddressOptionHandler.class)
- private List<Pair<String, Integer>> addresses;
-
- public List<Pair<String, Integer>> getAddresses() {
- return addresses;
- }
-
- public static class AddressOptionHandler extends OptionHandler<Pair<String, Integer>> {
-
- public AddressOptionHandler(CmdLineParser parser, OptionDef option, Setter<? super Pair<String, Integer>> setter) {
- super(parser, option, setter);
- }
-
- @Override
- public int parseArguments(Parameters params) throws CmdLineException {
- int counter = 0;
- while (true) {
- String param;
- try {
- param = params.getParameter(counter);
- } catch (CmdLineException ex) {
- break;
- }
-
- String[] hostPort = param.split(":");
- if (hostPort.length != 2) {
- throw new CmdLineException("Invalid address: " + param + ". Expected <host>:<port>");
- }
- Integer port = null;
- try {
- port = Integer.parseInt(hostPort[1]);
- } catch (NumberFormatException e) {
- throw new CmdLineException("Invalid port " + hostPort[1] + " for address " + param + ".");
- }
- setter.addValue(Pair.of(hostPort[0], port));
- counter++;
- }
- return counter;
- }
-
- @Override
- public String getDefaultMetaVariable() {
- return "addresses";
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0e21afa7/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/SocketTweetGeneratorDriver.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/SocketTweetGeneratorDriver.java b/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/SocketTweetGeneratorDriver.java
deleted file mode 100644
index d64809f..0000000
--- a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/SocketTweetGeneratorDriver.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.experiment.client;
-
-import org.kohsuke.args4j.CmdLineException;
-import org.kohsuke.args4j.CmdLineParser;
-
-public class SocketTweetGeneratorDriver {
- public static void main(String[] args) throws Exception {
- SocketTweetGeneratorConfig clientConfig = new SocketTweetGeneratorConfig();
- CmdLineParser clp = new CmdLineParser(clientConfig);
- try {
- clp.parseArgument(args);
- } catch (CmdLineException e) {
- System.err.println(e.getMessage());
- clp.printUsage(System.err);
- System.exit(1);
- }
-
- if ((clientConfig.getDataInterval() == -1 && clientConfig.getDataGenDuration() == -1)
- || (clientConfig.getDataInterval() > 0 && clientConfig.getDataGenDuration() > 0)) {
- System.err.println("Must use exactly one of -d or -di");
- clp.printUsage(System.err);
- System.exit(1);
- }
-
- SocketTweetGenerator client = new SocketTweetGenerator(clientConfig);
- client.start();
- }
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0e21afa7/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/SpatialIndexExperiment2OrchestratorServer.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/SpatialIndexExperiment2OrchestratorServer.java b/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/SpatialIndexExperiment2OrchestratorServer.java
deleted file mode 100644
index 4e360dd..0000000
--- a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/SpatialIndexExperiment2OrchestratorServer.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.experiment.client;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-public class SpatialIndexExperiment2OrchestratorServer {
-
- private static final Logger LOGGER = Logger.getLogger(SpatialIndexExperiment2OrchestratorServer.class.getName());
-
- private final int dataGenPort;
-
- private final int queryGenPort;
-
- private final int nDataGens;
-
- private final int nQueryGens;
-
- private final int nIntervals;
-
- private final AtomicBoolean running;
-
- public SpatialIndexExperiment2OrchestratorServer(int dataGenPort, int nDataGens, int nIntervals, int queryGenPort,
- int nQueryGens) {
- this.dataGenPort = dataGenPort;
- this.nDataGens = nDataGens;
- this.queryGenPort = queryGenPort;
- this.nQueryGens = nQueryGens;
- this.nIntervals = nIntervals;
- running = new AtomicBoolean();
- }
-
- public synchronized void start() throws IOException, InterruptedException {
- final AtomicBoolean dataGenBound = new AtomicBoolean();
- final AtomicBoolean queryGenBound = new AtomicBoolean();
- running.set(true);
- Thread t = new Thread(new Runnable() {
-
- @Override
- public void run() {
- try {
- ServerSocket dataGenSS = new ServerSocket(dataGenPort);
- synchronized (dataGenBound) {
- dataGenBound.set(true);
- dataGenBound.notifyAll();
- }
- ServerSocket queryGenSS = new ServerSocket(queryGenPort);
- synchronized (queryGenBound) {
- queryGenBound.set(true);
- queryGenBound.notifyAll();
- }
-
- Socket[] dataConn = new Socket[nDataGens];
- Socket[] queryConn = new Socket[nQueryGens];
- try {
- //#.wait until all dataGens and queryGens have connected to the orchestrator
- for (int i = 0; i < nDataGens; i++) {
- dataConn[i] = dataGenSS.accept();
- }
- for (int i = 0; i < nQueryGens; i++) {
- queryConn[i] = queryGenSS.accept();
- }
-
- //#.wait until queryGens are ready for generating query
- for (int i = 0; i < nQueryGens; i++) {
- receiveReached(queryConn[i]);
- }
-
- //#.wait until dataGens are ready for generating data after nIntervals of data were generated
- for (int i = 0; i < nIntervals; i++) {
- for (int j = 0; j < nDataGens; j++) {
- receiveReached(dataConn[j]);
- }
- }
-
- //#.send signal to queryGens to start sending queries
- for (int i = 0; i < nQueryGens; i++) {
- sendResume(queryConn[i]);
- }
- //#.send signal to dataGens to start sending records
- for (int i = 0; i < nDataGens; i++) {
- sendResume(dataConn[i]);
- }
-
- //#.wait until both dataGen and queryGen's are done
- for (int i = 0; i < nQueryGens; i++) {
- receiveReached(queryConn[i]);
- }
- for (int i = 0; i < nDataGens; i++) {
- receiveReached(dataConn[i]);
- }
-
- } finally {
- for (int i = 0; i < nDataGens; ++i) {
- if (dataConn[i] != null) {
- dataConn[i].close();
- }
- }
- dataGenSS.close();
- for (int i = 0; i < nQueryGens; ++i) {
- if (queryConn[i] != null) {
- queryConn[i].close();
- }
- }
- queryGenSS.close();
- }
- running.set(false);
- synchronized (SpatialIndexExperiment2OrchestratorServer.this) {
- SpatialIndexExperiment2OrchestratorServer.this.notifyAll();
- }
- } catch (Throwable t) {
- t.printStackTrace();
- }
- }
-
- });
- t.start();
- synchronized (dataGenBound) {
- while (!dataGenBound.get()) {
- dataGenBound.wait();
- }
- }
- synchronized (queryGenBound) {
- while (!queryGenBound.get()) {
- queryGenBound.wait();
- }
- }
- }
-
- private static void receiveReached(Socket conn) throws IOException {
- int msg = new DataInputStream(conn.getInputStream()).readInt();
- OrchestratorDGProtocol msgType = OrchestratorDGProtocol.values()[msg];
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Received " + msgType + " from " + conn.getRemoteSocketAddress());
- }
- if (msgType != OrchestratorDGProtocol.REACHED) {
- throw new IllegalStateException("Encounted unknown message type " + msgType);
- }
- }
-
- private void sendResume(Socket s) throws IOException {
- new DataOutputStream(s.getOutputStream()).writeInt(OrchestratorDGProtocol.RESUME.ordinal());
- s.getOutputStream().flush();
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Sent " + OrchestratorDGProtocol.RESUME + " to " + s.getRemoteSocketAddress());
- }
- }
-
- public synchronized void awaitFinished() throws InterruptedException {
- while (running.get()) {
- wait();
- }
- }
-
-}