You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/01/03 11:19:15 UTC
[17/50] [abbrv] git commit: Rename packages in preparation for move
to Apache
Rename packages in preparation for move to Apache
Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/df4c8078
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/df4c8078
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/df4c8078
Branch: refs/heads/dev
Commit: df4c807808aa36184f2e70d9367c1fc3eedea544
Parents: 169653c
Author: Bruce Robbins <ro...@everychoose-lm.corp.yahoo.com>
Authored: Sun Nov 20 22:44:20 2011 -0800
Committer: Bruce Robbins <ro...@everychoose-lm.corp.yahoo.com>
Committed: Sun Nov 20 22:44:20 2011 -0800
----------------------------------------------------------------------
.../io/s4/tools/loadgenerator/LoadGenerator.java | 395 ---------------
.../main/java/io/s4/tools/loadgenerator/Pacer.java | 131 -----
.../s4/tools/loadgenerator/LoadGenerator.java | 395 +++++++++++++++
.../org/apache/s4/tools/loadgenerator/Pacer.java | 131 +++++
4 files changed, 526 insertions(+), 526 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/df4c8078/s4-tools/loadgenerator/src/main/java/io/s4/tools/loadgenerator/LoadGenerator.java
----------------------------------------------------------------------
diff --git a/s4-tools/loadgenerator/src/main/java/io/s4/tools/loadgenerator/LoadGenerator.java b/s4-tools/loadgenerator/src/main/java/io/s4/tools/loadgenerator/LoadGenerator.java
deleted file mode 100644
index 534e67a..0000000
--- a/s4-tools/loadgenerator/src/main/java/io/s4/tools/loadgenerator/LoadGenerator.java
+++ /dev/null
@@ -1,395 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-package io.s4.tools.loadgenerator;
-
-import io.s4.client.Driver;
-import io.s4.client.Message;
-
-import java.io.BufferedReader;
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.Reader;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.json.JSONException;
-import org.json.JSONObject;
-
-public class LoadGenerator {
-
- public static void main(String args[]) {
- Options options = new Options();
- boolean warmUp = false;
-
- options.addOption(OptionBuilder.withArgName("rate")
- .hasArg()
- .withDescription("Rate (events per second)")
- .create("r"));
-
- options.addOption(OptionBuilder.withArgName("display_rate")
- .hasArg()
- .withDescription("Display Rate at specified second boundary")
- .create("d"));
-
- options.addOption(OptionBuilder.withArgName("adapter_address")
- .hasArg()
- .withDescription("Address of client adapter")
- .create("a"));
-
- options.addOption(OptionBuilder.withArgName("listener_application_name")
- .hasArg()
- .withDescription("Listener application name")
- .create("g"));
-
- options.addOption(OptionBuilder.withArgName("sleep_overhead")
- .hasArg()
- .withDescription("Sleep overhead")
- .create("o"));
-
- options.addOption(new Option("w", "Warm-up"));
-
- CommandLineParser parser = new GnuParser();
-
- CommandLine line = null;
- try {
- // parse the command line arguments
- line = parser.parse(options, args);
- } catch (ParseException exp) {
- // oops, something went wrong
- System.err.println("Parsing failed. Reason: " + exp.getMessage());
- System.exit(1);
- }
-
- int expectedRate = 250;
- if (line.hasOption("r")) {
- try {
- expectedRate = Integer.parseInt(line.getOptionValue("r"));
- } catch (Exception e) {
- System.err.println("Bad expected rate specified "
- + line.getOptionValue("r"));
- System.exit(1);
- }
- }
-
- int displayRateIntervalSeconds = 20;
- if (line.hasOption("d")) {
- try {
- displayRateIntervalSeconds = Integer.parseInt(line.getOptionValue("d"));
- } catch (Exception e) {
- System.err.println("Bad display rate value specified "
- + line.getOptionValue("d"));
- System.exit(1);
- }
- }
-
- int updateFrequency = 0;
- if (line.hasOption("f")) {
- try {
- updateFrequency = Integer.parseInt(line.getOptionValue("f"));
- } catch (Exception e) {
- System.err.println("Bad query udpdate frequency specified "
- + line.getOptionValue("f"));
- System.exit(1);
- }
- System.out.printf("Update frequency is %d\n", updateFrequency);
- }
-
- String clientAdapterAddress = null;
- String clientAdapterHost = null;
- int clientAdapterPort = -1;
- if (line.hasOption("a")) {
- clientAdapterAddress = line.getOptionValue("a");
- String[] parts = clientAdapterAddress.split(":");
- if (parts.length != 2) {
- System.err.println("Bad adapter address specified "
- + clientAdapterAddress);
- System.exit(1);
- }
- clientAdapterHost = parts[0];
-
- try {
- clientAdapterPort = Integer.parseInt(parts[1]);
- }
- catch (NumberFormatException nfe) {
- System.err.println("Bad adapter address specified "
- + clientAdapterAddress);
- System.exit(1);
- }
- }
-
- long sleepOverheadMicros = -1;
- if (line.hasOption("o")) {
- try {
- sleepOverheadMicros = Long.parseLong(line.getOptionValue("o"));
- } catch (NumberFormatException e) {
- System.err.println("Bad sleep overhead specified "
- + line.getOptionValue("o"));
- System.exit(1);
- }
- System.out.printf("Specified sleep overhead is %d\n",
- sleepOverheadMicros);
- }
-
- if (line.hasOption("w")) {
- warmUp = true;
- }
-
- List loArgs = line.getArgList();
- if (loArgs.size() < 1) {
- System.err.println("No input file specified");
- System.exit(1);
- }
-
- String inputFilename = (String) loArgs.get(0);
-
- LoadGenerator loadGenerator = new LoadGenerator();
- loadGenerator.setInputFilename(inputFilename);
- loadGenerator.setDisplayRateInterval(displayRateIntervalSeconds);
- loadGenerator.setExpectedRate(expectedRate);
- loadGenerator.setClientAdapterHost(clientAdapterHost);
- loadGenerator.setClientAdapterPort(clientAdapterPort);
- loadGenerator.run();
-
- System.exit(0);
- }
-
- private String inputFilename;
- private int emitCount;
- private int displayRateInterval = 0;
- private int expectedRate = 200;
- private String clientAdapterHost = null;
- private int clientAdapterPort = -1;
-
- private int adjustedExpectedRate = 1;
- private Map<Integer, EventTypeInfo> eventTypeInfoMap = new HashMap<Integer, EventTypeInfo>();
- private Driver driver;
- private boolean isConnected;
-
- public int getEmitCount() {
- return emitCount;
- }
-
- public void setInputFilename(String inputFilename) {
- this.inputFilename = inputFilename;
- }
-
- public void setDisplayRateInterval(int displayRateInterval) {
- this.displayRateInterval = displayRateInterval;
- }
-
- public void setExpectedRate(int expectedRate) {
- this.expectedRate = expectedRate;
- }
-
- public void setClientAdapterHost(String clientAdapterHost) {
- this.clientAdapterHost = clientAdapterHost;
- }
-
- public void setClientAdapterPort(int clientAdapterPort) {
- this.clientAdapterPort = clientAdapterPort;
- }
-
- public LoadGenerator() {
-
- }
-
- public void run() {
- // for now, no warm-up mechanism
- adjustedExpectedRate = expectedRate;
-
- long intervalStart = 0;
- int emitCountStart = 0;
-
- BufferedReader br = null;
- Reader inputReader = null;
- try {
- if (!connect()) {
- System.err.println("Failed to initialize client adapter driver");
- return;
- }
-
- if (inputFilename.equals("-")) {
- inputReader = new InputStreamReader(System.in);
- } else {
- inputReader = new FileReader(inputFilename);
- }
- br = new BufferedReader(inputReader);
- String inputLine = null;
- boolean firstLine = true;
-
- Pacer pacer = new Pacer(adjustedExpectedRate);
- while ((inputLine = br.readLine()) != null) {
- if (firstLine) {
- JSONObject jsonRecord = new JSONObject(inputLine);
- createEventTypeInfo(jsonRecord);
- System.out.println(eventTypeInfoMap);
- if (eventTypeInfoMap.size() == 0) {
- return;
- }
- firstLine = false;
- continue;
- }
-
- pacer.startCycle();
-
- try {
- JSONObject jsonRecord = new JSONObject(inputLine);
- int classIndex = jsonRecord.getInt("_index");
- EventTypeInfo eventTypeInfo = eventTypeInfoMap.get(classIndex);
-
- if (eventTypeInfo == null) {
- System.err.printf("Invalid _index value %d\n",
- classIndex);
- return;
- }
-
- Message message = new Message(eventTypeInfo.getStreamName(), eventTypeInfo.getClassName(), inputLine);
- sendMessage(message);
- emitCount++;
- } catch (JSONException je) {
- je.printStackTrace();
- System.err.printf("Bad input data %s\n", inputLine);
- continue;
- }
-
- // if it's time, display the actual emit rate
- if (intervalStart == 0) {
- intervalStart = System.currentTimeMillis();
- } else {
- long interval = System.currentTimeMillis() - intervalStart;
- if (interval >= (displayRateInterval * 1000)) {
- double rate = (emitCount - emitCountStart)
- / (interval / 1000.0);
- System.out.println("Rate is " + rate);
- intervalStart = System.currentTimeMillis();
- emitCountStart = emitCount;
- }
- }
-
- pacer.endCycle();
- pacer.maintainPace();
- }
- System.out.printf("Emitted %d events\n", emitCount);
- } catch (Exception e) {
- throw new RuntimeException(e);
- } finally {
- try {
- br.close();
- } catch (Exception e) {
- }
- try {
- inputReader.close();
- } catch (Exception e) {
- }
- try {
- driver.disconnect();
- } catch (Exception e) {
- }
- }
- }
-
- public boolean connect() {
- isConnected = false;
- try {
- System.out.println("Connecting...");
- driver = new Driver(clientAdapterHost, clientAdapterPort);
- boolean isInitialized = driver.init();
- isConnected = isInitialized & driver.connect();
- System.out.println("Connection made: " + isConnected);
- return isConnected;
- }
- catch (IOException ioe) {
- System.out.println("Connection made: " + isConnected);
- return isConnected;
- }
- catch (NullPointerException npe) {
- // there's a bug in the driver that causes a null pointer exception if
- // if the target server is down
- System.out.println("Connection made: " + isConnected);
- return isConnected;
- }
- }
-
- public boolean sendMessage(Message message) {
- final int MAX_RETRY = 5;
- boolean sent = false;
- int backoff = 10;
- for (int retries = 0; retries < MAX_RETRY; retries++) {
- try {
- if (!isConnected) {
- throw new IOException("Driver not connected");
- }
- driver.send(message);
- sent = true;
- break;
- }
- catch (IOException ioe) {
- try {
- System.out.printf("Sleeping for %f seconds\n", backoff/1000.0);
- Thread.sleep(backoff);
- } catch (InterruptedException ie) {}
- backoff = backoff*5;
- connect();
- }
- }
- return sent;
- }
-
- @SuppressWarnings("unchecked")
- public void createEventTypeInfo(JSONObject classInfo) {
- String className = "";
- try {
- for (Iterator it = classInfo.keys(); it.hasNext();) {
- className = (String) it.next();
- JSONObject jsonEventTypeInfo = classInfo.getJSONObject(className);
- int classIndex = (Integer) jsonEventTypeInfo.getInt("classIndex");
- String streamName = jsonEventTypeInfo.getString("streamName");
- eventTypeInfoMap.put(classIndex, new EventTypeInfo(className,
- streamName));
- }
- } catch (JSONException je) {
- je.printStackTrace();
- }
- }
-
- static class EventTypeInfo {
- private String className;
- private String streamName;
-
- public EventTypeInfo(String clazz, String streamName) {
- this.className = clazz;
- this.streamName = streamName;
- }
-
- public String getClassName() {
- return className;
- }
-
- public String getStreamName() {
- return streamName;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/df4c8078/s4-tools/loadgenerator/src/main/java/io/s4/tools/loadgenerator/Pacer.java
----------------------------------------------------------------------
diff --git a/s4-tools/loadgenerator/src/main/java/io/s4/tools/loadgenerator/Pacer.java b/s4-tools/loadgenerator/src/main/java/io/s4/tools/loadgenerator/Pacer.java
deleted file mode 100644
index 7aba882..0000000
--- a/s4-tools/loadgenerator/src/main/java/io/s4/tools/loadgenerator/Pacer.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-package io.s4.tools.loadgenerator;;
-
-public class Pacer {
- private long sleepOverheadMicros = -1;
- private int expectedRate = -1;
- private int adjustedExpectedRate = 1;
- private long startTime;
- private int cycleCount = 0;
-
- private static int PROCESS_TIME_LIST_MAX_SIZE = 15;
- private long[] processTimes = new long[PROCESS_TIME_LIST_MAX_SIZE];
- private int processTimePointer = 0;
- private long[] rateInfo = new long[] {0,100};
-
- public Pacer(int expectedRate) {
- this.expectedRate = expectedRate;
- this.adjustedExpectedRate = expectedRate; // the same for now
-
- if (sleepOverheadMicros == -1) {
- // calculate sleep overhead
- long totalSleepOverhead = 0;
- for (int i = 0; i < 50; i++) {
- long startTime = System.nanoTime();
- try {
- Thread.sleep(1);
- } catch (InterruptedException ie) {
- }
- totalSleepOverhead += (System.nanoTime() - startTime)
- - (1 * 1000 * 1000);
- }
- sleepOverheadMicros = (totalSleepOverhead / 50) / 1000;
- }
- }
-
- public void startCycle() {
- startTime = System.nanoTime();
- }
-
- public void endCycle() {
- processTimes[processTimePointer] = System.nanoTime() - startTime;
- processTimePointer = (processTimePointer == PROCESS_TIME_LIST_MAX_SIZE - 1) ? 0
- : processTimePointer + 1;
-
- cycleCount++;
-
- }
-
- public void maintainPace() {
- if (cycleCount == 1 || cycleCount % 20 == 0) {
- rateInfo = getRateInfo(rateInfo);
- }
- if (rateInfo[1] == 0 || cycleCount % rateInfo[1] == 0) {
- try {
- Thread.sleep(rateInfo[0]);
- } catch (InterruptedException ie) {
- }
- }
- }
-
- private long[] getRateInfo(long[] rateInfo) {
- long totalTimeNanos = 0;
- int entryCount = 0;
- for (int i = 0; i < processTimes.length; i++) {
- if (processTimes[i] == Long.MIN_VALUE) {
- break;
- }
- entryCount++;
- totalTimeNanos += processTimes[i];
- }
- long averageTimeMicros = (long) ((totalTimeNanos / (double) entryCount) / 1000.0);
- // fudge the time for additional overhead
- averageTimeMicros += (long) (averageTimeMicros * 0.30);
-
- if (cycleCount % 5000 == 0) {
- // System.out.println("Average time in micros is " +
- // averageTimeMicros);
- }
-
- long sleepTimeMicros = 0;
- long millis = 0;
-
- long timeToMeetRateMicros = adjustedExpectedRate * averageTimeMicros;
- long leftOver = 1000000 - timeToMeetRateMicros;
- if (leftOver <= 0) {
- sleepTimeMicros = 0;
- } else {
- sleepTimeMicros = (leftOver / adjustedExpectedRate)
- - sleepOverheadMicros;
- }
-
- // how many events can be processed in the nanos time?
- int eventsBeforeSleep = 1;
- if (sleepTimeMicros < 1000) {
- // less than 1 millisecond sleep time, so need to stagger sleeps to
- // emulate such a sleep
- sleepTimeMicros = 1000 + sleepOverheadMicros;
- millis = 1;
- double numNapsDouble = ((double) leftOver / sleepTimeMicros);
- int numNaps = (int) Math.ceil(numNapsDouble);
- if (numNaps > 0) {
- eventsBeforeSleep = adjustedExpectedRate / numNaps;
- }
-
- if (leftOver <= 0) {
- millis = 0;
- eventsBeforeSleep = 1000;
- }
- } else {
- millis = sleepTimeMicros / 1000;
- }
-
- rateInfo[0] = millis;
- rateInfo[1] = eventsBeforeSleep;
- return rateInfo;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/df4c8078/s4-tools/loadgenerator/src/main/java/org/apache/s4/tools/loadgenerator/LoadGenerator.java
----------------------------------------------------------------------
diff --git a/s4-tools/loadgenerator/src/main/java/org/apache/s4/tools/loadgenerator/LoadGenerator.java b/s4-tools/loadgenerator/src/main/java/org/apache/s4/tools/loadgenerator/LoadGenerator.java
new file mode 100644
index 0000000..2db0e37
--- /dev/null
+++ b/s4-tools/loadgenerator/src/main/java/org/apache/s4/tools/loadgenerator/LoadGenerator.java
@@ -0,0 +1,395 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.tools.loadgenerator;
+
+import org.apache.s4.client.Driver;
+import org.apache.s4.client.Message;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+public class LoadGenerator {
+
+ public static void main(String args[]) {
+ Options options = new Options();
+ boolean warmUp = false;
+
+ options.addOption(OptionBuilder.withArgName("rate")
+ .hasArg()
+ .withDescription("Rate (events per second)")
+ .create("r"));
+
+ options.addOption(OptionBuilder.withArgName("display_rate")
+ .hasArg()
+ .withDescription("Display Rate at specified second boundary")
+ .create("d"));
+
+ options.addOption(OptionBuilder.withArgName("adapter_address")
+ .hasArg()
+ .withDescription("Address of client adapter")
+ .create("a"));
+
+ options.addOption(OptionBuilder.withArgName("listener_application_name")
+ .hasArg()
+ .withDescription("Listener application name")
+ .create("g"));
+
+ options.addOption(OptionBuilder.withArgName("sleep_overhead")
+ .hasArg()
+ .withDescription("Sleep overhead")
+ .create("o"));
+
+ options.addOption(new Option("w", "Warm-up"));
+
+ CommandLineParser parser = new GnuParser();
+
+ CommandLine line = null;
+ try {
+ // parse the command line arguments
+ line = parser.parse(options, args);
+ } catch (ParseException exp) {
+ // oops, something went wrong
+ System.err.println("Parsing failed. Reason: " + exp.getMessage());
+ System.exit(1);
+ }
+
+ int expectedRate = 250;
+ if (line.hasOption("r")) {
+ try {
+ expectedRate = Integer.parseInt(line.getOptionValue("r"));
+ } catch (Exception e) {
+ System.err.println("Bad expected rate specified "
+ + line.getOptionValue("r"));
+ System.exit(1);
+ }
+ }
+
+ int displayRateIntervalSeconds = 20;
+ if (line.hasOption("d")) {
+ try {
+ displayRateIntervalSeconds = Integer.parseInt(line.getOptionValue("d"));
+ } catch (Exception e) {
+ System.err.println("Bad display rate value specified "
+ + line.getOptionValue("d"));
+ System.exit(1);
+ }
+ }
+
+ int updateFrequency = 0;
+ if (line.hasOption("f")) {
+ try {
+ updateFrequency = Integer.parseInt(line.getOptionValue("f"));
+ } catch (Exception e) {
+ System.err.println("Bad query udpdate frequency specified "
+ + line.getOptionValue("f"));
+ System.exit(1);
+ }
+ System.out.printf("Update frequency is %d\n", updateFrequency);
+ }
+
+ String clientAdapterAddress = null;
+ String clientAdapterHost = null;
+ int clientAdapterPort = -1;
+ if (line.hasOption("a")) {
+ clientAdapterAddress = line.getOptionValue("a");
+ String[] parts = clientAdapterAddress.split(":");
+ if (parts.length != 2) {
+ System.err.println("Bad adapter address specified "
+ + clientAdapterAddress);
+ System.exit(1);
+ }
+ clientAdapterHost = parts[0];
+
+ try {
+ clientAdapterPort = Integer.parseInt(parts[1]);
+ }
+ catch (NumberFormatException nfe) {
+ System.err.println("Bad adapter address specified "
+ + clientAdapterAddress);
+ System.exit(1);
+ }
+ }
+
+ long sleepOverheadMicros = -1;
+ if (line.hasOption("o")) {
+ try {
+ sleepOverheadMicros = Long.parseLong(line.getOptionValue("o"));
+ } catch (NumberFormatException e) {
+ System.err.println("Bad sleep overhead specified "
+ + line.getOptionValue("o"));
+ System.exit(1);
+ }
+ System.out.printf("Specified sleep overhead is %d\n",
+ sleepOverheadMicros);
+ }
+
+ if (line.hasOption("w")) {
+ warmUp = true;
+ }
+
+ List loArgs = line.getArgList();
+ if (loArgs.size() < 1) {
+ System.err.println("No input file specified");
+ System.exit(1);
+ }
+
+ String inputFilename = (String) loArgs.get(0);
+
+ LoadGenerator loadGenerator = new LoadGenerator();
+ loadGenerator.setInputFilename(inputFilename);
+ loadGenerator.setDisplayRateInterval(displayRateIntervalSeconds);
+ loadGenerator.setExpectedRate(expectedRate);
+ loadGenerator.setClientAdapterHost(clientAdapterHost);
+ loadGenerator.setClientAdapterPort(clientAdapterPort);
+ loadGenerator.run();
+
+ System.exit(0);
+ }
+
+ private String inputFilename;
+ private int emitCount;
+ private int displayRateInterval = 0;
+ private int expectedRate = 200;
+ private String clientAdapterHost = null;
+ private int clientAdapterPort = -1;
+
+ private int adjustedExpectedRate = 1;
+ private Map<Integer, EventTypeInfo> eventTypeInfoMap = new HashMap<Integer, EventTypeInfo>();
+ private Driver driver;
+ private boolean isConnected;
+
+ public int getEmitCount() {
+ return emitCount;
+ }
+
+ public void setInputFilename(String inputFilename) {
+ this.inputFilename = inputFilename;
+ }
+
+ public void setDisplayRateInterval(int displayRateInterval) {
+ this.displayRateInterval = displayRateInterval;
+ }
+
+ public void setExpectedRate(int expectedRate) {
+ this.expectedRate = expectedRate;
+ }
+
+ public void setClientAdapterHost(String clientAdapterHost) {
+ this.clientAdapterHost = clientAdapterHost;
+ }
+
+ public void setClientAdapterPort(int clientAdapterPort) {
+ this.clientAdapterPort = clientAdapterPort;
+ }
+
+ public LoadGenerator() {
+
+ }
+
+ public void run() {
+ // for now, no warm-up mechanism
+ adjustedExpectedRate = expectedRate;
+
+ long intervalStart = 0;
+ int emitCountStart = 0;
+
+ BufferedReader br = null;
+ Reader inputReader = null;
+ try {
+ if (!connect()) {
+ System.err.println("Failed to initialize client adapter driver");
+ return;
+ }
+
+ if (inputFilename.equals("-")) {
+ inputReader = new InputStreamReader(System.in);
+ } else {
+ inputReader = new FileReader(inputFilename);
+ }
+ br = new BufferedReader(inputReader);
+ String inputLine = null;
+ boolean firstLine = true;
+
+ Pacer pacer = new Pacer(adjustedExpectedRate);
+ while ((inputLine = br.readLine()) != null) {
+ if (firstLine) {
+ JSONObject jsonRecord = new JSONObject(inputLine);
+ createEventTypeInfo(jsonRecord);
+ System.out.println(eventTypeInfoMap);
+ if (eventTypeInfoMap.size() == 0) {
+ return;
+ }
+ firstLine = false;
+ continue;
+ }
+
+ pacer.startCycle();
+
+ try {
+ JSONObject jsonRecord = new JSONObject(inputLine);
+ int classIndex = jsonRecord.getInt("_index");
+ EventTypeInfo eventTypeInfo = eventTypeInfoMap.get(classIndex);
+
+ if (eventTypeInfo == null) {
+ System.err.printf("Invalid _index value %d\n",
+ classIndex);
+ return;
+ }
+
+ Message message = new Message(eventTypeInfo.getStreamName(), eventTypeInfo.getClassName(), inputLine);
+ sendMessage(message);
+ emitCount++;
+ } catch (JSONException je) {
+ je.printStackTrace();
+ System.err.printf("Bad input data %s\n", inputLine);
+ continue;
+ }
+
+ // if it's time, display the actual emit rate
+ if (intervalStart == 0) {
+ intervalStart = System.currentTimeMillis();
+ } else {
+ long interval = System.currentTimeMillis() - intervalStart;
+ if (interval >= (displayRateInterval * 1000)) {
+ double rate = (emitCount - emitCountStart)
+ / (interval / 1000.0);
+ System.out.println("Rate is " + rate);
+ intervalStart = System.currentTimeMillis();
+ emitCountStart = emitCount;
+ }
+ }
+
+ pacer.endCycle();
+ pacer.maintainPace();
+ }
+ System.out.printf("Emitted %d events\n", emitCount);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ try {
+ br.close();
+ } catch (Exception e) {
+ }
+ try {
+ inputReader.close();
+ } catch (Exception e) {
+ }
+ try {
+ driver.disconnect();
+ } catch (Exception e) {
+ }
+ }
+ }
+
+ public boolean connect() {
+ isConnected = false;
+ try {
+ System.out.println("Connecting...");
+ driver = new Driver(clientAdapterHost, clientAdapterPort);
+ boolean isInitialized = driver.init();
+ isConnected = isInitialized & driver.connect();
+ System.out.println("Connection made: " + isConnected);
+ return isConnected;
+ }
+ catch (IOException ioe) {
+ System.out.println("Connection made: " + isConnected);
+ return isConnected;
+ }
+ catch (NullPointerException npe) {
+ // there's a bug in the driver that causes a null pointer exception if
+ // if the target server is down
+ System.out.println("Connection made: " + isConnected);
+ return isConnected;
+ }
+ }
+
+ public boolean sendMessage(Message message) {
+ final int MAX_RETRY = 5;
+ boolean sent = false;
+ int backoff = 10;
+ for (int retries = 0; retries < MAX_RETRY; retries++) {
+ try {
+ if (!isConnected) {
+ throw new IOException("Driver not connected");
+ }
+ driver.send(message);
+ sent = true;
+ break;
+ }
+ catch (IOException ioe) {
+ try {
+ System.out.printf("Sleeping for %f seconds\n", backoff/1000.0);
+ Thread.sleep(backoff);
+ } catch (InterruptedException ie) {}
+ backoff = backoff*5;
+ connect();
+ }
+ }
+ return sent;
+ }
+
+ @SuppressWarnings("unchecked")
+ public void createEventTypeInfo(JSONObject classInfo) {
+ String className = "";
+ try {
+ for (Iterator it = classInfo.keys(); it.hasNext();) {
+ className = (String) it.next();
+ JSONObject jsonEventTypeInfo = classInfo.getJSONObject(className);
+ int classIndex = (Integer) jsonEventTypeInfo.getInt("classIndex");
+ String streamName = jsonEventTypeInfo.getString("streamName");
+ eventTypeInfoMap.put(classIndex, new EventTypeInfo(className,
+ streamName));
+ }
+ } catch (JSONException je) {
+ je.printStackTrace();
+ }
+ }
+
+ static class EventTypeInfo {
+ private String className;
+ private String streamName;
+
+ public EventTypeInfo(String clazz, String streamName) {
+ this.className = clazz;
+ this.streamName = streamName;
+ }
+
+ public String getClassName() {
+ return className;
+ }
+
+ public String getStreamName() {
+ return streamName;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/df4c8078/s4-tools/loadgenerator/src/main/java/org/apache/s4/tools/loadgenerator/Pacer.java
----------------------------------------------------------------------
diff --git a/s4-tools/loadgenerator/src/main/java/org/apache/s4/tools/loadgenerator/Pacer.java b/s4-tools/loadgenerator/src/main/java/org/apache/s4/tools/loadgenerator/Pacer.java
new file mode 100644
index 0000000..d85f6ec
--- /dev/null
+++ b/s4-tools/loadgenerator/src/main/java/org/apache/s4/tools/loadgenerator/Pacer.java
@@ -0,0 +1,131 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.tools.loadgenerator;;
+
+public class Pacer {
+ private long sleepOverheadMicros = -1;
+ private int expectedRate = -1;
+ private int adjustedExpectedRate = 1;
+ private long startTime;
+ private int cycleCount = 0;
+
+ private static int PROCESS_TIME_LIST_MAX_SIZE = 15;
+ private long[] processTimes = new long[PROCESS_TIME_LIST_MAX_SIZE];
+ private int processTimePointer = 0;
+ private long[] rateInfo = new long[] {0,100};
+
+ public Pacer(int expectedRate) {
+ this.expectedRate = expectedRate;
+ this.adjustedExpectedRate = expectedRate; // the same for now
+
+ if (sleepOverheadMicros == -1) {
+ // calculate sleep overhead
+ long totalSleepOverhead = 0;
+ for (int i = 0; i < 50; i++) {
+ long startTime = System.nanoTime();
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException ie) {
+ }
+ totalSleepOverhead += (System.nanoTime() - startTime)
+ - (1 * 1000 * 1000);
+ }
+ sleepOverheadMicros = (totalSleepOverhead / 50) / 1000;
+ }
+ }
+
+ public void startCycle() {
+ startTime = System.nanoTime();
+ }
+
+ public void endCycle() {
+ processTimes[processTimePointer] = System.nanoTime() - startTime;
+ processTimePointer = (processTimePointer == PROCESS_TIME_LIST_MAX_SIZE - 1) ? 0
+ : processTimePointer + 1;
+
+ cycleCount++;
+
+ }
+
+ public void maintainPace() {
+ if (cycleCount == 1 || cycleCount % 20 == 0) {
+ rateInfo = getRateInfo(rateInfo);
+ }
+ if (rateInfo[1] == 0 || cycleCount % rateInfo[1] == 0) {
+ try {
+ Thread.sleep(rateInfo[0]);
+ } catch (InterruptedException ie) {
+ }
+ }
+ }
+
+ private long[] getRateInfo(long[] rateInfo) {
+ long totalTimeNanos = 0;
+ int entryCount = 0;
+ for (int i = 0; i < processTimes.length; i++) {
+ if (processTimes[i] == Long.MIN_VALUE) {
+ break;
+ }
+ entryCount++;
+ totalTimeNanos += processTimes[i];
+ }
+ long averageTimeMicros = (long) ((totalTimeNanos / (double) entryCount) / 1000.0);
+ // fudge the time for additional overhead
+ averageTimeMicros += (long) (averageTimeMicros * 0.30);
+
+ if (cycleCount % 5000 == 0) {
+ // System.out.println("Average time in micros is " +
+ // averageTimeMicros);
+ }
+
+ long sleepTimeMicros = 0;
+ long millis = 0;
+
+ long timeToMeetRateMicros = adjustedExpectedRate * averageTimeMicros;
+ long leftOver = 1000000 - timeToMeetRateMicros;
+ if (leftOver <= 0) {
+ sleepTimeMicros = 0;
+ } else {
+ sleepTimeMicros = (leftOver / adjustedExpectedRate)
+ - sleepOverheadMicros;
+ }
+
+ // how many events can be processed in the nanos time?
+ int eventsBeforeSleep = 1;
+ if (sleepTimeMicros < 1000) {
+ // less than 1 millisecond sleep time, so need to stagger sleeps to
+ // emulate such a sleep
+ sleepTimeMicros = 1000 + sleepOverheadMicros;
+ millis = 1;
+ double numNapsDouble = ((double) leftOver / sleepTimeMicros);
+ int numNaps = (int) Math.ceil(numNapsDouble);
+ if (numNaps > 0) {
+ eventsBeforeSleep = adjustedExpectedRate / numNaps;
+ }
+
+ if (leftOver <= 0) {
+ millis = 0;
+ eventsBeforeSleep = 1000;
+ }
+ } else {
+ millis = sleepTimeMicros / 1000;
+ }
+
+ rateInfo[0] = millis;
+ rateInfo[1] = eventsBeforeSleep;
+ return rateInfo;
+ }
+}