You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/01/03 11:19:15 UTC

[17/50] [abbrv] git commit: Rename packages in preparation for move to Apache

Rename packages in preparation for move to Apache


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/df4c8078
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/df4c8078
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/df4c8078

Branch: refs/heads/dev
Commit: df4c807808aa36184f2e70d9367c1fc3eedea544
Parents: 169653c
Author: Bruce Robbins <ro...@everychoose-lm.corp.yahoo.com>
Authored: Sun Nov 20 22:44:20 2011 -0800
Committer: Bruce Robbins <ro...@everychoose-lm.corp.yahoo.com>
Committed: Sun Nov 20 22:44:20 2011 -0800

----------------------------------------------------------------------
 .../io/s4/tools/loadgenerator/LoadGenerator.java   |  395 ---------------
 .../main/java/io/s4/tools/loadgenerator/Pacer.java |  131 -----
 .../s4/tools/loadgenerator/LoadGenerator.java      |  395 +++++++++++++++
 .../org/apache/s4/tools/loadgenerator/Pacer.java   |  131 +++++
 4 files changed, 526 insertions(+), 526 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/df4c8078/s4-tools/loadgenerator/src/main/java/io/s4/tools/loadgenerator/LoadGenerator.java
----------------------------------------------------------------------
diff --git a/s4-tools/loadgenerator/src/main/java/io/s4/tools/loadgenerator/LoadGenerator.java b/s4-tools/loadgenerator/src/main/java/io/s4/tools/loadgenerator/LoadGenerator.java
deleted file mode 100644
index 534e67a..0000000
--- a/s4-tools/loadgenerator/src/main/java/io/s4/tools/loadgenerator/LoadGenerator.java
+++ /dev/null
@@ -1,395 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *          http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.tools.loadgenerator;
-
-import io.s4.client.Driver;
-import io.s4.client.Message;
-
-import java.io.BufferedReader;
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.Reader;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.json.JSONException;
-import org.json.JSONObject;
-
-public class LoadGenerator {
-
-    public static void main(String args[]) {
-        Options options = new Options();
-        boolean warmUp = false;
-
-        options.addOption(OptionBuilder.withArgName("rate")
-                                       .hasArg()
-                                       .withDescription("Rate (events per second)")
-                                       .create("r"));
-
-        options.addOption(OptionBuilder.withArgName("display_rate")
-                                       .hasArg()
-                                       .withDescription("Display Rate at specified second boundary")
-                                       .create("d"));
-
-        options.addOption(OptionBuilder.withArgName("adapter_address")
-                                       .hasArg()
-                                       .withDescription("Address of client adapter")
-                                       .create("a"));
-
-        options.addOption(OptionBuilder.withArgName("listener_application_name")
-                                       .hasArg()
-                                       .withDescription("Listener application name")
-                                       .create("g"));
-
-        options.addOption(OptionBuilder.withArgName("sleep_overhead")
-                                       .hasArg()
-                                       .withDescription("Sleep overhead")
-                                       .create("o"));
-
-        options.addOption(new Option("w", "Warm-up"));
-
-        CommandLineParser parser = new GnuParser();
-
-        CommandLine line = null;
-        try {
-            // parse the command line arguments
-            line = parser.parse(options, args);
-        } catch (ParseException exp) {
-            // oops, something went wrong
-            System.err.println("Parsing failed.  Reason: " + exp.getMessage());
-            System.exit(1);
-        }
-
-        int expectedRate = 250;
-        if (line.hasOption("r")) {
-            try {
-                expectedRate = Integer.parseInt(line.getOptionValue("r"));
-            } catch (Exception e) {
-                System.err.println("Bad expected rate specified "
-                        + line.getOptionValue("r"));
-                System.exit(1);
-            }
-        }
-
-        int displayRateIntervalSeconds = 20;
-        if (line.hasOption("d")) {
-            try {
-                displayRateIntervalSeconds = Integer.parseInt(line.getOptionValue("d"));
-            } catch (Exception e) {
-                System.err.println("Bad display rate value specified "
-                        + line.getOptionValue("d"));
-                System.exit(1);
-            }
-        }
-
-        int updateFrequency = 0;
-        if (line.hasOption("f")) {
-            try {
-                updateFrequency = Integer.parseInt(line.getOptionValue("f"));
-            } catch (Exception e) {
-                System.err.println("Bad query udpdate frequency specified "
-                        + line.getOptionValue("f"));
-                System.exit(1);
-            }
-            System.out.printf("Update frequency is %d\n", updateFrequency);
-        }
-
-        String clientAdapterAddress = null;
-        String clientAdapterHost = null;
-        int clientAdapterPort = -1;
-        if (line.hasOption("a")) {
-            clientAdapterAddress = line.getOptionValue("a");
-            String[] parts = clientAdapterAddress.split(":");
-            if (parts.length != 2) {
-                System.err.println("Bad adapter address specified "
-                        + clientAdapterAddress);
-                System.exit(1);
-            }
-            clientAdapterHost = parts[0];
-            
-            try {
-                clientAdapterPort = Integer.parseInt(parts[1]);
-            }
-            catch (NumberFormatException nfe) {
-                System.err.println("Bad adapter address specified "
-                        + clientAdapterAddress);
-                System.exit(1);                
-            }
-        }
-
-        long sleepOverheadMicros = -1;
-        if (line.hasOption("o")) {
-            try {
-                sleepOverheadMicros = Long.parseLong(line.getOptionValue("o"));
-            } catch (NumberFormatException e) {
-                System.err.println("Bad sleep overhead specified "
-                        + line.getOptionValue("o"));
-                System.exit(1);
-            }
-            System.out.printf("Specified sleep overhead is %d\n",
-                              sleepOverheadMicros);
-        }
-
-        if (line.hasOption("w")) {
-            warmUp = true;
-        }
-
-        List loArgs = line.getArgList();
-        if (loArgs.size() < 1) {
-            System.err.println("No input file specified");
-            System.exit(1);
-        }
-
-        String inputFilename = (String) loArgs.get(0);
-
-        LoadGenerator loadGenerator = new LoadGenerator();
-        loadGenerator.setInputFilename(inputFilename);
-        loadGenerator.setDisplayRateInterval(displayRateIntervalSeconds);
-        loadGenerator.setExpectedRate(expectedRate);
-        loadGenerator.setClientAdapterHost(clientAdapterHost);
-        loadGenerator.setClientAdapterPort(clientAdapterPort);
-        loadGenerator.run();
-
-        System.exit(0);
-    }
-
-    private String inputFilename;
-    private int emitCount;
-    private int displayRateInterval = 0;
-    private int expectedRate = 200;
-    private String clientAdapterHost = null;
-    private int clientAdapterPort = -1;
-    
-    private int adjustedExpectedRate = 1;
-    private Map<Integer, EventTypeInfo> eventTypeInfoMap = new HashMap<Integer, EventTypeInfo>();
-    private Driver driver;
-    private boolean isConnected;
-
-    public int getEmitCount() {
-        return emitCount;
-    }
-
-    public void setInputFilename(String inputFilename) {
-        this.inputFilename = inputFilename;
-    }
-
-    public void setDisplayRateInterval(int displayRateInterval) {
-        this.displayRateInterval = displayRateInterval;
-    }
-
-    public void setExpectedRate(int expectedRate) {
-        this.expectedRate = expectedRate;
-    }
-
-    public void setClientAdapterHost(String clientAdapterHost) {
-        this.clientAdapterHost = clientAdapterHost;
-    }
-
-    public void setClientAdapterPort(int clientAdapterPort) {
-        this.clientAdapterPort = clientAdapterPort;
-    }
-
-    public LoadGenerator() {
-        
-    }
-
-    public void run() {
-        // for now, no warm-up mechanism
-        adjustedExpectedRate = expectedRate;
-
-        long intervalStart = 0;
-        int emitCountStart = 0;
-
-        BufferedReader br = null;
-        Reader inputReader = null;
-        try {
-            if (!connect()) {
-                System.err.println("Failed to initialize client adapter driver");
-                return;
-            }
-            
-            if (inputFilename.equals("-")) {
-                inputReader = new InputStreamReader(System.in);
-            } else {
-                inputReader = new FileReader(inputFilename);
-            }
-            br = new BufferedReader(inputReader);
-            String inputLine = null;
-            boolean firstLine = true;
-            
-            Pacer pacer = new Pacer(adjustedExpectedRate);
-            while ((inputLine = br.readLine()) != null) {
-                if (firstLine) {
-                    JSONObject jsonRecord = new JSONObject(inputLine);
-                    createEventTypeInfo(jsonRecord);
-                    System.out.println(eventTypeInfoMap);
-                    if (eventTypeInfoMap.size() == 0) {
-                        return;
-                    }
-                    firstLine = false;
-                    continue;
-                }
-                
-                pacer.startCycle();
-
-                try {
-                    JSONObject jsonRecord = new JSONObject(inputLine);
-                    int classIndex = jsonRecord.getInt("_index");
-                    EventTypeInfo eventTypeInfo = eventTypeInfoMap.get(classIndex);
-                      
-                    if (eventTypeInfo == null) {
-                        System.err.printf("Invalid _index value %d\n",
-                                          classIndex);
-                        return;
-                    }
-                    
-                    Message message = new Message(eventTypeInfo.getStreamName(), eventTypeInfo.getClassName(), inputLine);
-                    sendMessage(message);
-                    emitCount++;
-                } catch (JSONException je) {
-                    je.printStackTrace();
-                    System.err.printf("Bad input data %s\n", inputLine);
-                    continue;
-                }
-
-                // if it's time, display the actual emit rate
-                if (intervalStart == 0) {
-                    intervalStart = System.currentTimeMillis();
-                } else {
-                    long interval = System.currentTimeMillis() - intervalStart;
-                    if (interval >= (displayRateInterval * 1000)) {
-                        double rate = (emitCount - emitCountStart)
-                                / (interval / 1000.0);
-                        System.out.println("Rate is " + rate);
-                        intervalStart = System.currentTimeMillis();
-                        emitCountStart = emitCount;
-                    }
-                }
-
-                pacer.endCycle();
-                pacer.maintainPace();
-            }
-            System.out.printf("Emitted %d events\n", emitCount);
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        } finally {
-            try {
-                br.close();
-            } catch (Exception e) {
-            }
-            try {
-                inputReader.close();
-            } catch (Exception e) {
-            }
-            try {
-                driver.disconnect();
-            } catch (Exception e) {
-            }
-        }
-    }
-    
-    public boolean connect() {
-        isConnected = false;
-        try {
-            System.out.println("Connecting...");
-            driver = new Driver(clientAdapterHost, clientAdapterPort);
-            boolean isInitialized = driver.init();
-            isConnected = isInitialized & driver.connect();
-            System.out.println("Connection made: " + isConnected);
-            return isConnected;
-        }
-        catch (IOException ioe) {
-            System.out.println("Connection made: " + isConnected);
-            return isConnected;
-        }
-        catch (NullPointerException npe) {
-            // there's a bug in the driver that causes a null pointer exception if
-            // if the target server is down
-            System.out.println("Connection made: " + isConnected);
-            return isConnected;
-        }
-    }
-    
-    public boolean sendMessage(Message message) {
-        final int MAX_RETRY = 5;
-        boolean sent = false;
-        int backoff = 10;
-        for (int retries = 0; retries < MAX_RETRY; retries++) {
-            try {
-                if (!isConnected) {
-                    throw new IOException("Driver not connected");
-                }
-                driver.send(message);
-                sent = true;
-                break;
-            }
-            catch (IOException ioe) {
-                try {
-                    System.out.printf("Sleeping for %f seconds\n", backoff/1000.0);
-                    Thread.sleep(backoff);
-                } catch (InterruptedException ie) {}
-                backoff = backoff*5;
-                connect();
-            }
-        }
-        return sent;
-    }
-
-    @SuppressWarnings("unchecked")
-    public void createEventTypeInfo(JSONObject classInfo) {
-        String className = "";
-        try {
-            for (Iterator it = classInfo.keys(); it.hasNext();) {
-                className = (String) it.next();
-                JSONObject jsonEventTypeInfo = classInfo.getJSONObject(className);
-                int classIndex = (Integer) jsonEventTypeInfo.getInt("classIndex");
-                String streamName = jsonEventTypeInfo.getString("streamName");
-                eventTypeInfoMap.put(classIndex, new EventTypeInfo(className,
-                                                                   streamName));
-            }
-        } catch (JSONException je) {
-            je.printStackTrace();
-        }
-    }
-
-    static class EventTypeInfo {
-        private String className;
-        private String streamName;
-
-        public EventTypeInfo(String clazz, String streamName) {
-            this.className = clazz;
-            this.streamName = streamName;
-        }
-
-        public String getClassName() {
-            return className;
-        }
-
-        public String getStreamName() {
-            return streamName;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/df4c8078/s4-tools/loadgenerator/src/main/java/io/s4/tools/loadgenerator/Pacer.java
----------------------------------------------------------------------
diff --git a/s4-tools/loadgenerator/src/main/java/io/s4/tools/loadgenerator/Pacer.java b/s4-tools/loadgenerator/src/main/java/io/s4/tools/loadgenerator/Pacer.java
deleted file mode 100644
index 7aba882..0000000
--- a/s4-tools/loadgenerator/src/main/java/io/s4/tools/loadgenerator/Pacer.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *          http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.tools.loadgenerator;;
-
-public class Pacer {
-    private long sleepOverheadMicros = -1;
-    private int expectedRate = -1;
-    private int adjustedExpectedRate = 1;
-    private long startTime;
-    private int cycleCount = 0;
-
-    private static int PROCESS_TIME_LIST_MAX_SIZE = 15;
-    private long[] processTimes = new long[PROCESS_TIME_LIST_MAX_SIZE];
-    private int processTimePointer = 0;
-    private long[] rateInfo = new long[] {0,100};
-    
-    public Pacer(int expectedRate) {
-        this.expectedRate = expectedRate;
-        this.adjustedExpectedRate = expectedRate; // the same for now
-
-        if (sleepOverheadMicros == -1) {
-            // calculate sleep overhead
-            long totalSleepOverhead = 0;
-            for (int i = 0; i < 50; i++) {
-                long startTime = System.nanoTime();
-                try {
-                    Thread.sleep(1);
-                } catch (InterruptedException ie) {
-                }
-                totalSleepOverhead += (System.nanoTime() - startTime)
-                        - (1 * 1000 * 1000);
-            }
-            sleepOverheadMicros = (totalSleepOverhead / 50) / 1000;
-        }
-    }
-    
-    public void startCycle() {
-        startTime = System.nanoTime();
-    }
-    
-    public void endCycle() {
-        processTimes[processTimePointer] = System.nanoTime() - startTime;
-        processTimePointer = (processTimePointer == PROCESS_TIME_LIST_MAX_SIZE - 1) ? 0
-                : processTimePointer + 1;
-        
-        cycleCount++;
-        
-    }
-    
-    public void maintainPace() {   
-        if (cycleCount == 1 || cycleCount % 20 == 0) {
-            rateInfo = getRateInfo(rateInfo);
-        }  
-        if (rateInfo[1] == 0 || cycleCount % rateInfo[1] == 0) {
-            try {
-                Thread.sleep(rateInfo[0]);
-            } catch (InterruptedException ie) {
-            }
-        }
-    }
-    
-    private long[] getRateInfo(long[] rateInfo) {
-        long totalTimeNanos = 0;
-        int entryCount = 0;
-        for (int i = 0; i < processTimes.length; i++) {
-            if (processTimes[i] == Long.MIN_VALUE) {
-                break;
-            }
-            entryCount++;
-            totalTimeNanos += processTimes[i];
-        }
-        long averageTimeMicros = (long) ((totalTimeNanos / (double) entryCount) / 1000.0);
-        // fudge the time for additional overhead
-        averageTimeMicros += (long) (averageTimeMicros * 0.30);
-
-        if (cycleCount % 5000 == 0) {
-            // System.out.println("Average time in micros is " +
-            // averageTimeMicros);
-        }
-
-        long sleepTimeMicros = 0;
-        long millis = 0;
-
-        long timeToMeetRateMicros = adjustedExpectedRate * averageTimeMicros;
-        long leftOver = 1000000 - timeToMeetRateMicros;
-        if (leftOver <= 0) {
-            sleepTimeMicros = 0;
-        } else {
-            sleepTimeMicros = (leftOver / adjustedExpectedRate)
-                    - sleepOverheadMicros;
-        }
-
-        // how many events can be processed in the nanos time?
-        int eventsBeforeSleep = 1;
-        if (sleepTimeMicros < 1000) {
-            // less than 1 millisecond sleep time, so need to stagger sleeps to
-            // emulate such a sleep
-            sleepTimeMicros = 1000 + sleepOverheadMicros;
-            millis = 1;
-            double numNapsDouble = ((double) leftOver / sleepTimeMicros);
-            int numNaps = (int) Math.ceil(numNapsDouble);
-            if (numNaps > 0) {
-                eventsBeforeSleep = adjustedExpectedRate / numNaps;
-            }
-
-            if (leftOver <= 0) {
-                millis = 0;
-                eventsBeforeSleep = 1000;
-            }
-        } else {
-            millis = sleepTimeMicros / 1000;
-        }
-
-        rateInfo[0] = millis;
-        rateInfo[1] = eventsBeforeSleep;
-        return rateInfo;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/df4c8078/s4-tools/loadgenerator/src/main/java/org/apache/s4/tools/loadgenerator/LoadGenerator.java
----------------------------------------------------------------------
diff --git a/s4-tools/loadgenerator/src/main/java/org/apache/s4/tools/loadgenerator/LoadGenerator.java b/s4-tools/loadgenerator/src/main/java/org/apache/s4/tools/loadgenerator/LoadGenerator.java
new file mode 100644
index 0000000..2db0e37
--- /dev/null
+++ b/s4-tools/loadgenerator/src/main/java/org/apache/s4/tools/loadgenerator/LoadGenerator.java
@@ -0,0 +1,395 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *          http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.tools.loadgenerator;
+
+import org.apache.s4.client.Driver;
+import org.apache.s4.client.Message;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+public class LoadGenerator {
+
+    public static void main(String args[]) {
+        Options options = new Options();
+        boolean warmUp = false;
+
+        options.addOption(OptionBuilder.withArgName("rate")
+                                       .hasArg()
+                                       .withDescription("Rate (events per second)")
+                                       .create("r"));
+
+        options.addOption(OptionBuilder.withArgName("display_rate")
+                                       .hasArg()
+                                       .withDescription("Display Rate at specified second boundary")
+                                       .create("d"));
+
+        options.addOption(OptionBuilder.withArgName("adapter_address")
+                                       .hasArg()
+                                       .withDescription("Address of client adapter")
+                                       .create("a"));
+
+        options.addOption(OptionBuilder.withArgName("listener_application_name")
+                                       .hasArg()
+                                       .withDescription("Listener application name")
+                                       .create("g"));
+
+        options.addOption(OptionBuilder.withArgName("sleep_overhead")
+                                       .hasArg()
+                                       .withDescription("Sleep overhead")
+                                       .create("o"));
+
+        options.addOption(new Option("w", "Warm-up"));
+
+        CommandLineParser parser = new GnuParser();
+
+        CommandLine line = null;
+        try {
+            // parse the command line arguments
+            line = parser.parse(options, args);
+        } catch (ParseException exp) {
+            // oops, something went wrong
+            System.err.println("Parsing failed.  Reason: " + exp.getMessage());
+            System.exit(1);
+        }
+
+        int expectedRate = 250;
+        if (line.hasOption("r")) {
+            try {
+                expectedRate = Integer.parseInt(line.getOptionValue("r"));
+            } catch (Exception e) {
+                System.err.println("Bad expected rate specified "
+                        + line.getOptionValue("r"));
+                System.exit(1);
+            }
+        }
+
+        int displayRateIntervalSeconds = 20;
+        if (line.hasOption("d")) {
+            try {
+                displayRateIntervalSeconds = Integer.parseInt(line.getOptionValue("d"));
+            } catch (Exception e) {
+                System.err.println("Bad display rate value specified "
+                        + line.getOptionValue("d"));
+                System.exit(1);
+            }
+        }
+
+        int updateFrequency = 0;
+        if (line.hasOption("f")) {
+            try {
+                updateFrequency = Integer.parseInt(line.getOptionValue("f"));
+            } catch (Exception e) {
+                System.err.println("Bad query udpdate frequency specified "
+                        + line.getOptionValue("f"));
+                System.exit(1);
+            }
+            System.out.printf("Update frequency is %d\n", updateFrequency);
+        }
+
+        String clientAdapterAddress = null;
+        String clientAdapterHost = null;
+        int clientAdapterPort = -1;
+        if (line.hasOption("a")) {
+            clientAdapterAddress = line.getOptionValue("a");
+            String[] parts = clientAdapterAddress.split(":");
+            if (parts.length != 2) {
+                System.err.println("Bad adapter address specified "
+                        + clientAdapterAddress);
+                System.exit(1);
+            }
+            clientAdapterHost = parts[0];
+            
+            try {
+                clientAdapterPort = Integer.parseInt(parts[1]);
+            }
+            catch (NumberFormatException nfe) {
+                System.err.println("Bad adapter address specified "
+                        + clientAdapterAddress);
+                System.exit(1);                
+            }
+        }
+
+        long sleepOverheadMicros = -1;
+        if (line.hasOption("o")) {
+            try {
+                sleepOverheadMicros = Long.parseLong(line.getOptionValue("o"));
+            } catch (NumberFormatException e) {
+                System.err.println("Bad sleep overhead specified "
+                        + line.getOptionValue("o"));
+                System.exit(1);
+            }
+            System.out.printf("Specified sleep overhead is %d\n",
+                              sleepOverheadMicros);
+        }
+
+        if (line.hasOption("w")) {
+            warmUp = true;
+        }
+
+        List loArgs = line.getArgList();
+        if (loArgs.size() < 1) {
+            System.err.println("No input file specified");
+            System.exit(1);
+        }
+
+        String inputFilename = (String) loArgs.get(0);
+
+        LoadGenerator loadGenerator = new LoadGenerator();
+        loadGenerator.setInputFilename(inputFilename);
+        loadGenerator.setDisplayRateInterval(displayRateIntervalSeconds);
+        loadGenerator.setExpectedRate(expectedRate);
+        loadGenerator.setClientAdapterHost(clientAdapterHost);
+        loadGenerator.setClientAdapterPort(clientAdapterPort);
+        loadGenerator.run();
+
+        System.exit(0);
+    }
+
+    private String inputFilename;
+    private int emitCount;
+    private int displayRateInterval = 0;
+    private int expectedRate = 200;
+    private String clientAdapterHost = null;
+    private int clientAdapterPort = -1;
+    
+    private int adjustedExpectedRate = 1;
+    private Map<Integer, EventTypeInfo> eventTypeInfoMap = new HashMap<Integer, EventTypeInfo>();
+    private Driver driver;
+    private boolean isConnected;
+
+    public int getEmitCount() {
+        return emitCount;
+    }
+
+    public void setInputFilename(String inputFilename) {
+        this.inputFilename = inputFilename;
+    }
+
+    public void setDisplayRateInterval(int displayRateInterval) {
+        this.displayRateInterval = displayRateInterval;
+    }
+
+    public void setExpectedRate(int expectedRate) {
+        this.expectedRate = expectedRate;
+    }
+
+    public void setClientAdapterHost(String clientAdapterHost) {
+        this.clientAdapterHost = clientAdapterHost;
+    }
+
+    public void setClientAdapterPort(int clientAdapterPort) {
+        this.clientAdapterPort = clientAdapterPort;
+    }
+
+    public LoadGenerator() {
+        
+    }
+
+    public void run() {
+        // for now, no warm-up mechanism
+        adjustedExpectedRate = expectedRate;
+
+        long intervalStart = 0;
+        int emitCountStart = 0;
+
+        BufferedReader br = null;
+        Reader inputReader = null;
+        try {
+            if (!connect()) {
+                System.err.println("Failed to initialize client adapter driver");
+                return;
+            }
+            
+            if (inputFilename.equals("-")) {
+                inputReader = new InputStreamReader(System.in);
+            } else {
+                inputReader = new FileReader(inputFilename);
+            }
+            br = new BufferedReader(inputReader);
+            String inputLine = null;
+            boolean firstLine = true;
+            
+            Pacer pacer = new Pacer(adjustedExpectedRate);
+            while ((inputLine = br.readLine()) != null) {
+                if (firstLine) {
+                    JSONObject jsonRecord = new JSONObject(inputLine);
+                    createEventTypeInfo(jsonRecord);
+                    System.out.println(eventTypeInfoMap);
+                    if (eventTypeInfoMap.size() == 0) {
+                        return;
+                    }
+                    firstLine = false;
+                    continue;
+                }
+                
+                pacer.startCycle();
+
+                try {
+                    JSONObject jsonRecord = new JSONObject(inputLine);
+                    int classIndex = jsonRecord.getInt("_index");
+                    EventTypeInfo eventTypeInfo = eventTypeInfoMap.get(classIndex);
+                      
+                    if (eventTypeInfo == null) {
+                        System.err.printf("Invalid _index value %d\n",
+                                          classIndex);
+                        return;
+                    }
+                    
+                    Message message = new Message(eventTypeInfo.getStreamName(), eventTypeInfo.getClassName(), inputLine);
+                    sendMessage(message);
+                    emitCount++;
+                } catch (JSONException je) {
+                    je.printStackTrace();
+                    System.err.printf("Bad input data %s\n", inputLine);
+                    continue;
+                }
+
+                // if it's time, display the actual emit rate
+                if (intervalStart == 0) {
+                    intervalStart = System.currentTimeMillis();
+                } else {
+                    long interval = System.currentTimeMillis() - intervalStart;
+                    if (interval >= (displayRateInterval * 1000)) {
+                        double rate = (emitCount - emitCountStart)
+                                / (interval / 1000.0);
+                        System.out.println("Rate is " + rate);
+                        intervalStart = System.currentTimeMillis();
+                        emitCountStart = emitCount;
+                    }
+                }
+
+                pacer.endCycle();
+                pacer.maintainPace();
+            }
+            System.out.printf("Emitted %d events\n", emitCount);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        } finally {
+            try {
+                br.close();
+            } catch (Exception e) {
+            }
+            try {
+                inputReader.close();
+            } catch (Exception e) {
+            }
+            try {
+                driver.disconnect();
+            } catch (Exception e) {
+            }
+        }
+    }
+    
+    public boolean connect() {
+        isConnected = false;
+        try {
+            System.out.println("Connecting...");
+            driver = new Driver(clientAdapterHost, clientAdapterPort);
+            boolean isInitialized = driver.init();
+            isConnected = isInitialized & driver.connect();
+            System.out.println("Connection made: " + isConnected);
+            return isConnected;
+        }
+        catch (IOException ioe) {
+            System.out.println("Connection made: " + isConnected);
+            return isConnected;
+        }
+        catch (NullPointerException npe) {
+            // there's a bug in the driver that causes a null pointer exception if
+            // if the target server is down
+            System.out.println("Connection made: " + isConnected);
+            return isConnected;
+        }
+    }
+    
+    public boolean sendMessage(Message message) {
+        final int MAX_RETRY = 5;
+        boolean sent = false;
+        int backoff = 10;
+        for (int retries = 0; retries < MAX_RETRY; retries++) {
+            try {
+                if (!isConnected) {
+                    throw new IOException("Driver not connected");
+                }
+                driver.send(message);
+                sent = true;
+                break;
+            }
+            catch (IOException ioe) {
+                try {
+                    System.out.printf("Sleeping for %f seconds\n", backoff/1000.0);
+                    Thread.sleep(backoff);
+                } catch (InterruptedException ie) {}
+                backoff = backoff*5;
+                connect();
+            }
+        }
+        return sent;
+    }
+
+    @SuppressWarnings("unchecked")
+    public void createEventTypeInfo(JSONObject classInfo) {
+        String className = "";
+        try {
+            for (Iterator it = classInfo.keys(); it.hasNext();) {
+                className = (String) it.next();
+                JSONObject jsonEventTypeInfo = classInfo.getJSONObject(className);
+                int classIndex = (Integer) jsonEventTypeInfo.getInt("classIndex");
+                String streamName = jsonEventTypeInfo.getString("streamName");
+                eventTypeInfoMap.put(classIndex, new EventTypeInfo(className,
+                                                                   streamName));
+            }
+        } catch (JSONException je) {
+            je.printStackTrace();
+        }
+    }
+
+    static class EventTypeInfo {
+        private String className;
+        private String streamName;
+
+        public EventTypeInfo(String clazz, String streamName) {
+            this.className = clazz;
+            this.streamName = streamName;
+        }
+
+        public String getClassName() {
+            return className;
+        }
+
+        public String getStreamName() {
+            return streamName;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/df4c8078/s4-tools/loadgenerator/src/main/java/org/apache/s4/tools/loadgenerator/Pacer.java
----------------------------------------------------------------------
diff --git a/s4-tools/loadgenerator/src/main/java/org/apache/s4/tools/loadgenerator/Pacer.java b/s4-tools/loadgenerator/src/main/java/org/apache/s4/tools/loadgenerator/Pacer.java
new file mode 100644
index 0000000..d85f6ec
--- /dev/null
+++ b/s4-tools/loadgenerator/src/main/java/org/apache/s4/tools/loadgenerator/Pacer.java
@@ -0,0 +1,131 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *          http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.tools.loadgenerator;;
+
+public class Pacer {
+    private long sleepOverheadMicros = -1;
+    private int expectedRate = -1;
+    private int adjustedExpectedRate = 1;
+    private long startTime;
+    private int cycleCount = 0;
+
+    private static int PROCESS_TIME_LIST_MAX_SIZE = 15;
+    private long[] processTimes = new long[PROCESS_TIME_LIST_MAX_SIZE];
+    private int processTimePointer = 0;
+    private long[] rateInfo = new long[] {0,100};
+    
+    public Pacer(int expectedRate) {
+        this.expectedRate = expectedRate;
+        this.adjustedExpectedRate = expectedRate; // the same for now
+
+        if (sleepOverheadMicros == -1) {
+            // calculate sleep overhead
+            long totalSleepOverhead = 0;
+            for (int i = 0; i < 50; i++) {
+                long startTime = System.nanoTime();
+                try {
+                    Thread.sleep(1);
+                } catch (InterruptedException ie) {
+                }
+                totalSleepOverhead += (System.nanoTime() - startTime)
+                        - (1 * 1000 * 1000);
+            }
+            sleepOverheadMicros = (totalSleepOverhead / 50) / 1000;
+        }
+    }
+    
+    public void startCycle() {
+        startTime = System.nanoTime();
+    }
+    
+    public void endCycle() {
+        processTimes[processTimePointer] = System.nanoTime() - startTime;
+        processTimePointer = (processTimePointer == PROCESS_TIME_LIST_MAX_SIZE - 1) ? 0
+                : processTimePointer + 1;
+        
+        cycleCount++;
+        
+    }
+    
+    public void maintainPace() {   
+        if (cycleCount == 1 || cycleCount % 20 == 0) {
+            rateInfo = getRateInfo(rateInfo);
+        }  
+        if (rateInfo[1] == 0 || cycleCount % rateInfo[1] == 0) {
+            try {
+                Thread.sleep(rateInfo[0]);
+            } catch (InterruptedException ie) {
+            }
+        }
+    }
+    
+    private long[] getRateInfo(long[] rateInfo) {
+        long totalTimeNanos = 0;
+        int entryCount = 0;
+        for (int i = 0; i < processTimes.length; i++) {
+            if (processTimes[i] == Long.MIN_VALUE) {
+                break;
+            }
+            entryCount++;
+            totalTimeNanos += processTimes[i];
+        }
+        long averageTimeMicros = (long) ((totalTimeNanos / (double) entryCount) / 1000.0);
+        // fudge the time for additional overhead
+        averageTimeMicros += (long) (averageTimeMicros * 0.30);
+
+        if (cycleCount % 5000 == 0) {
+            // System.out.println("Average time in micros is " +
+            // averageTimeMicros);
+        }
+
+        long sleepTimeMicros = 0;
+        long millis = 0;
+
+        long timeToMeetRateMicros = adjustedExpectedRate * averageTimeMicros;
+        long leftOver = 1000000 - timeToMeetRateMicros;
+        if (leftOver <= 0) {
+            sleepTimeMicros = 0;
+        } else {
+            sleepTimeMicros = (leftOver / adjustedExpectedRate)
+                    - sleepOverheadMicros;
+        }
+
+        // how many events can be processed in the nanos time?
+        int eventsBeforeSleep = 1;
+        if (sleepTimeMicros < 1000) {
+            // less than 1 millisecond sleep time, so need to stagger sleeps to
+            // emulate such a sleep
+            sleepTimeMicros = 1000 + sleepOverheadMicros;
+            millis = 1;
+            double numNapsDouble = ((double) leftOver / sleepTimeMicros);
+            int numNaps = (int) Math.ceil(numNapsDouble);
+            if (numNaps > 0) {
+                eventsBeforeSleep = adjustedExpectedRate / numNaps;
+            }
+
+            if (leftOver <= 0) {
+                millis = 0;
+                eventsBeforeSleep = 1000;
+            }
+        } else {
+            millis = sleepTimeMicros / 1000;
+        }
+
+        rateInfo[0] = millis;
+        rateInfo[1] = eventsBeforeSleep;
+        return rateInfo;
+    }
+}