You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/01/03 11:19:16 UTC

[40/50] [abbrv] Rename packages in preparation for move to Apache

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/util/LoadGenerator.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/util/LoadGenerator.java b/s4-core/src/main/java/org/apache/s4/util/LoadGenerator.java
new file mode 100644
index 0000000..b94f086
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/util/LoadGenerator.java
@@ -0,0 +1,640 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.util;
+
+import org.apache.s4.collector.EventWrapper;
+import org.apache.s4.dispatcher.partitioner.CompoundKeyInfo;
+import org.apache.s4.emitter.CommLayerEmitter;
+import org.apache.s4.emitter.EventEmitter;
+import org.apache.s4.schema.Schema;
+import org.apache.s4.schema.Schema.Property;
+import org.apache.s4.serialize.KryoSerDeser;
+import org.apache.s4.serialize.SerializerDeserializer;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.lang.reflect.Array;
+import java.lang.reflect.Method;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+public class LoadGenerator {
+
+    public static void main(String args[]) {
+        Options options = new Options();
+        boolean warmUp = false;
+
+        options.addOption(OptionBuilder.withArgName("rate")
+                                       .hasArg()
+                                       .withDescription("Rate (events per second)")
+                                       .create("r"));
+
+        options.addOption(OptionBuilder.withArgName("display_rate")
+                                       .hasArg()
+                                       .withDescription("Display Rate at specified second boundary")
+                                       .create("d"));
+
+        options.addOption(OptionBuilder.withArgName("start_boundary")
+                                       .hasArg()
+                                       .withDescription("Start boundary in seconds")
+                                       .create("b"));
+
+        options.addOption(OptionBuilder.withArgName("run_for")
+                                       .hasArg()
+                                       .withDescription("Run for a specified number of seconds")
+                                       .create("x"));
+
+        options.addOption(OptionBuilder.withArgName("cluster_manager")
+                                       .hasArg()
+                                       .withDescription("Cluster manager")
+                                       .create("z"));
+
+        options.addOption(OptionBuilder.withArgName("sender_application_name")
+                                       .hasArg()
+                                       .withDescription("Sender application name")
+                                       .create("a"));
+
+        options.addOption(OptionBuilder.withArgName("listener_application_name")
+                                       .hasArg()
+                                       .withDescription("Listener application name")
+                                       .create("g"));
+
+        options.addOption(OptionBuilder.withArgName("sleep_overhead")
+                                       .hasArg()
+                                       .withDescription("Sleep overhead")
+                                       .create("o"));
+
+        options.addOption(new Option("w", "Warm-up"));
+
+        CommandLineParser parser = new GnuParser();
+
+        CommandLine line = null;
+        try {
+            // parse the command line arguments
+            line = parser.parse(options, args);
+        } catch (ParseException exp) {
+            // oops, something went wrong
+            System.err.println("Parsing failed.  Reason: " + exp.getMessage());
+            System.exit(1);
+        }
+
+        int expectedRate = 250;
+        if (line.hasOption("r")) {
+            try {
+                expectedRate = Integer.parseInt(line.getOptionValue("r"));
+            } catch (Exception e) {
+                System.err.println("Bad expected rate specified "
+                        + line.getOptionValue("r"));
+                System.exit(1);
+            }
+        }
+
+        int displayRateIntervalSeconds = 20;
+        if (line.hasOption("d")) {
+            try {
+                displayRateIntervalSeconds = Integer.parseInt(line.getOptionValue("d"));
+            } catch (Exception e) {
+                System.err.println("Bad display rate value specified "
+                        + line.getOptionValue("d"));
+                System.exit(1);
+            }
+        }
+
+        int startBoundary = 2;
+        if (line.hasOption("b")) {
+            try {
+                startBoundary = Integer.parseInt(line.getOptionValue("b"));
+            } catch (Exception e) {
+                System.err.println("Bad start boundary value specified "
+                        + line.getOptionValue("b"));
+                System.exit(1);
+            }
+        }
+
+        int updateFrequency = 0;
+        if (line.hasOption("f")) {
+            try {
+                updateFrequency = Integer.parseInt(line.getOptionValue("f"));
+            } catch (Exception e) {
+                System.err.println("Bad query udpdate frequency specified "
+                        + line.getOptionValue("f"));
+                System.exit(1);
+            }
+            System.out.printf("Update frequency is %d\n", updateFrequency);
+        }
+
+        int runForTime = 0;
+        if (line.hasOption("x")) {
+            try {
+                runForTime = Integer.parseInt(line.getOptionValue("x"));
+            } catch (Exception e) {
+                System.err.println("Bad run for time specified "
+                        + line.getOptionValue("x"));
+                System.exit(1);
+            }
+            System.out.printf("Run for time is %d\n", runForTime);
+        }
+
+        String clusterManagerAddress = null;
+        if (line.hasOption("z")) {
+            clusterManagerAddress = line.getOptionValue("z");
+        }
+
+        String senderApplicationName = null;
+        if (line.hasOption("a")) {
+            senderApplicationName = line.getOptionValue("a");
+        }
+
+        String listenerApplicationName = null;
+        if (line.hasOption("a")) {
+            listenerApplicationName = line.getOptionValue("g");
+        }
+
+        if (listenerApplicationName == null) {
+            listenerApplicationName = senderApplicationName;
+        }
+
+        long sleepOverheadMicros = -1;
+        if (line.hasOption("o")) {
+            try {
+                sleepOverheadMicros = Long.parseLong(line.getOptionValue("o"));
+            } catch (NumberFormatException e) {
+                System.err.println("Bad sleep overhead specified "
+                        + line.getOptionValue("o"));
+                System.exit(1);
+            }
+            System.out.printf("Specified sleep overhead is %d\n",
+                              sleepOverheadMicros);
+        }
+
+        if (line.hasOption("w")) {
+            warmUp = true;
+        }
+
+        List loArgs = line.getArgList();
+        if (loArgs.size() < 1) {
+            System.err.println("No input file specified");
+            System.exit(1);
+        }
+
+        String inputFilename = (String) loArgs.get(0);
+
+        EventEmitter emitter = null;
+
+        SerializerDeserializer serDeser = new KryoSerDeser();
+
+        CommLayerEmitter clEmitter = new CommLayerEmitter();
+        clEmitter.setAppName(senderApplicationName);
+        clEmitter.setListenerAppName(listenerApplicationName);
+        clEmitter.setClusterManagerAddress(clusterManagerAddress);
+        clEmitter.setSenderId(String.valueOf(System.currentTimeMillis() / 1000));
+        clEmitter.setSerDeser(serDeser);
+        clEmitter.init();
+        emitter = clEmitter;
+
+        long endTime = 0;
+        if (runForTime > 0) {
+            endTime = System.currentTimeMillis() + (runForTime * 1000);
+        }
+
+        LoadGenerator loadGenerator = new LoadGenerator();
+        loadGenerator.setInputFilename(inputFilename);
+        loadGenerator.setEventEmitter(clEmitter);
+        loadGenerator.setDisplayRateInterval(displayRateIntervalSeconds);
+        loadGenerator.setExpectedRate(expectedRate);
+        loadGenerator.run();
+
+        System.exit(0);
+    }
+
+    private EventEmitter eventEmitter;
+    private String inputFilename;
+    private int emitCount;
+    private int displayRateInterval = 0;
+
+    private int expectedRate = 200;
+    private int adjustedExpectedRate = 1;
+    private long sleepOverheadMicros = -1;
+    private static int PROCESS_TIME_LIST_MAX_SIZE = 15;
+    private long[] processTimes = new long[PROCESS_TIME_LIST_MAX_SIZE];
+    private int processTimePointer = 0;
+    private Map<Integer, EventTypeInfo> eventTypeInfoMap = new HashMap<Integer, EventTypeInfo>();
+
+    public int getEmitCount() {
+        return emitCount;
+    }
+
+    public void setEventEmitter(EventEmitter eventEmitter) {
+        this.eventEmitter = eventEmitter;
+    }
+
+    public void setInputFilename(String inputFilename) {
+        this.inputFilename = inputFilename;
+    }
+
+    public void setDisplayRateInterval(int displayRateInterval) {
+        this.displayRateInterval = displayRateInterval;
+    }
+
+    public void setSleepOverheadMicros(long sleepOverheadMicros) {
+        this.sleepOverheadMicros = sleepOverheadMicros;
+    }
+
+    public void setExpectedRate(int expectedRate) {
+        this.expectedRate = expectedRate;
+    }
+
+    private Random rand = new Random(System.currentTimeMillis());
+
+    public LoadGenerator() {
+        if (sleepOverheadMicros == -1) {
+            // calculate sleep overhead
+            long totalSleepOverhead = 0;
+            for (int i = 0; i < 50; i++) {
+                long startTime = System.nanoTime();
+                try {
+                    Thread.sleep(1);
+                } catch (InterruptedException ie) {
+                }
+                totalSleepOverhead += (System.nanoTime() - startTime)
+                        - (1 * 1000 * 1000);
+            }
+            sleepOverheadMicros = (totalSleepOverhead / 50) / 1000;
+        }
+        System.out.println("Sleep overhead is " + sleepOverheadMicros);
+    }
+
+    public void run() {
+        // for now, no warm-up mechanism
+        adjustedExpectedRate = expectedRate;
+
+        long startTime = 0;
+        long intervalStart = 0;
+        int emitCountStart = 0;
+        long[] rateInfo = new long[2];
+        rateInfo[0] = 100; // start with a sleep time of 100
+
+        BufferedReader br = null;
+        Reader inputReader = null;
+        try {
+            if (inputFilename.equals("-")) {
+                inputReader = new InputStreamReader(System.in);
+            } else {
+                inputReader = new FileReader(inputFilename);
+            }
+            br = new BufferedReader(inputReader);
+            String inputLine = null;
+            boolean firstLine = true;
+            EventWrapper eventWrapper = null;
+            for (startTime = System.nanoTime(); (inputLine = br.readLine()) != null; startTime = System.nanoTime()) {
+                if (firstLine) {
+                    JSONObject jsonRecord = new JSONObject(inputLine);
+                    createEventTypeInfo(jsonRecord);
+                    System.out.println(eventTypeInfoMap);
+                    if (eventTypeInfoMap.size() == 0) {
+                        return;
+                    }
+                    firstLine = false;
+                    continue;
+                }
+
+                try {
+                    JSONObject jsonRecord = new JSONObject(inputLine);
+                    int classIndex = jsonRecord.getInt("_index");
+                    EventTypeInfo eventTypeInfo = eventTypeInfoMap.get(classIndex);
+                    if (eventTypeInfo == null) {
+                        System.err.printf("Invalid _index value %d\n",
+                                          classIndex);
+                        return;
+                    }
+
+                    Object event = makeRecord(jsonRecord,
+                                              eventTypeInfo.getSchema());
+                    eventWrapper = new EventWrapper(eventTypeInfo.getStreamName(),
+                                                    event,
+                                                    new ArrayList<CompoundKeyInfo>());
+                    // System.out.println(eventWrapper.getStreamName() + ": " +
+                    // eventWrapper.getEvent());
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    System.err.printf("Bad input data %s\n", inputLine);
+                    continue;
+                }
+
+                int partition = Math.abs(rand.nextInt())
+                        % eventEmitter.getNodeCount();
+
+                eventEmitter.emit(partition, eventWrapper);
+                emitCount++;
+
+                // the rest of the stuff in this block is just to maintain the
+                // rate
+                processTimes[processTimePointer] = System.nanoTime()
+                        - startTime;
+                processTimePointer = (processTimePointer == PROCESS_TIME_LIST_MAX_SIZE - 1) ? 0
+                        : processTimePointer + 1;
+                if (emitCount == 1 || emitCount % 20 == 0) {
+                    rateInfo = getRateInfo(rateInfo);
+                }
+
+                // if it's time, display the actual emit rate
+                if (intervalStart == 0) {
+                    intervalStart = System.currentTimeMillis();
+                } else {
+                    long interval = System.currentTimeMillis() - intervalStart;
+                    if (interval >= (displayRateInterval * 1000)) {
+                        double rate = (emitCount - emitCountStart)
+                                / (interval / 1000.0);
+                        System.out.println("Rate is " + rate);
+                        intervalStart = System.currentTimeMillis();
+                        emitCountStart = emitCount;
+                    }
+                }
+
+                if (rateInfo[1] == 0 || emitCount % rateInfo[1] == 0) {
+                    try {
+                        Thread.sleep(rateInfo[0]);
+                    } catch (InterruptedException ie) {
+                    }
+                }
+            }
+            System.out.printf("Emitted %d events\n", emitCount);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        } finally {
+            try {
+                br.close();
+            } catch (Exception e) {
+            }
+            try {
+                inputReader.close();
+            } catch (Exception e) {
+            }
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    public void createEventTypeInfo(JSONObject classInfo) {
+        String className = "";
+        try {
+            for (Iterator it = classInfo.keys(); it.hasNext();) {
+                className = (String) it.next();
+                JSONObject jsonEventTypeInfo = classInfo.getJSONObject(className);
+                int classIndex = (Integer) jsonEventTypeInfo.getInt("classIndex");
+                String streamName = jsonEventTypeInfo.getString("streamName");
+
+                Class clazz = Class.forName(className);
+                Schema schema = new Schema(clazz);
+                eventTypeInfoMap.put(classIndex, new EventTypeInfo(schema,
+                                                                   streamName));
+            }
+        } catch (JSONException je) {
+            je.printStackTrace();
+        } catch (ClassNotFoundException cnfe) {
+            System.err.println("Count not locate class " + className);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+	public static Object makeRecord(JSONObject jsonRecord, Schema schema) {
+        Object event = null;
+        try {
+            event = schema.getType().newInstance();
+
+            for (Iterator it = jsonRecord.keys(); it.hasNext();) {
+                String propertyName = (String) it.next();
+
+                Property property = schema.getProperties().get(propertyName);
+
+                if (property == null) {
+                    continue; // not in schema, just continue
+                }
+
+                Method setterMethod = property.getSetterMethod();
+                Object value = jsonRecord.get(propertyName);
+                if (value.equals(JSONObject.NULL)) {
+                    continue;
+                }
+
+                setterMethod.invoke(event, makeSettableValue(property, value));
+
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+
+        return event;
+    }
+
+    @SuppressWarnings("unchecked")
+	public static Object makeSettableValue(Property property, Object value) {
+        String propertyName = property.getName();
+        Class propertyType = property.getType();
+
+        if (propertyType.isArray()) {
+            if (!(value instanceof JSONArray)) {
+                System.err.println("Type mismatch for field " + propertyName);
+                return null;
+            }
+            System.out.println("Is array!");
+            return makeArray(property, (JSONArray) value);
+        } else if (property.isList()) {
+            if (!(value instanceof JSONArray)) {
+                System.err.println("Type mismatch for field " + propertyName);
+                return null;
+            }
+            return makeList(property, (JSONArray) value);
+        } else if (propertyType.isPrimitive()) {
+            if (!(value instanceof Number || value instanceof Boolean)) {
+                System.err.println("Type mismatch for field " + propertyName
+                        + "; expected number or boolean, found "
+                        + value.getClass());
+                return null;
+            }
+            return value; // hmm... does this work?
+        } else if (propertyType.equals(String.class)) {
+            if (!(value instanceof String)) {
+                System.err.println("Type mismatch for field " + propertyName
+                        + "; expected String, found " + value.getClass());
+                return null;
+            }
+            return value;
+        } else if (property.isNumber()) {
+            if (!(value instanceof Integer || value instanceof Long
+                    || value instanceof Float || value instanceof Double
+                    || value instanceof BigDecimal || value instanceof BigInteger)) {
+                return null;
+            }
+
+            Number adjustedValue = (Number) value;
+            if (propertyType.equals(Long.class) && !(value instanceof Long)) {
+                adjustedValue = new Long(((Number) value).longValue());
+            } else if (propertyType.equals(Integer.class)
+                    && !(value instanceof Integer)) {
+                adjustedValue = new Integer(((Number) value).intValue());
+            } else if (propertyType.equals(Double.class)
+                    && !(value instanceof Double)) {
+                adjustedValue = new Double(((Number) value).doubleValue());
+            } else if (propertyType.equals(Float.class)
+                    && !(value instanceof Float)) {
+                adjustedValue = new Float(((Number) value).floatValue());
+            } else if (propertyType.equals(BigDecimal.class)) {
+                adjustedValue = new BigDecimal(((Number) value).longValue());
+            } else if (propertyType.equals(BigInteger.class)) {
+                adjustedValue = BigInteger.valueOf(((Number) value).longValue());
+            }
+            return adjustedValue;
+        } else if (value instanceof JSONObject) {
+            return makeRecord((JSONObject) value, property.getSchema());
+        }
+
+        return null;
+    }
+
+	public static Object makeList(Property property, JSONArray jsonArray) {
+        Property componentProperty = property.getComponentProperty();
+
+        int size = jsonArray.length();
+
+        List<Object> list = new ArrayList<Object>(size);
+
+        try {
+            for (int i = 0; i < size; i++) {
+                Object value = jsonArray.get(i);
+                list.add(makeSettableValue(componentProperty, value));
+            }
+        } catch (JSONException je) {
+            throw new RuntimeException(je);
+        }
+
+        return list;
+    }
+
+    @SuppressWarnings("unchecked")
+	public static Object makeArray(Property property, JSONArray jsonArray) {
+        Property componentProperty = property.getComponentProperty();
+        Class clazz = componentProperty.getType();
+
+        int size = jsonArray.length();
+
+        Object array = Array.newInstance(clazz, size);
+
+        try {
+            for (int i = 0; i < size; i++) {
+                Object value = jsonArray.get(i);
+                Object adjustedValue = makeSettableValue(componentProperty,
+                                                         value);
+                Array.set(array, i, adjustedValue);
+            }
+        } catch (JSONException je) {
+            throw new RuntimeException(je);
+        }
+        return array;
+    }
+
+    private long[] getRateInfo(long[] rateInfo) {
+        long totalTimeNanos = 0;
+        int entryCount = 0;
+        for (int i = 0; i < processTimes.length; i++) {
+            if (processTimes[i] == Long.MIN_VALUE) {
+                break;
+            }
+            entryCount++;
+            totalTimeNanos += processTimes[i];
+        }
+        long averageTimeMicros = (long) ((totalTimeNanos / (double) entryCount) / 1000.0);
+        // fudge the time for additional overhead
+        averageTimeMicros += (long) (averageTimeMicros * 0.30);
+
+        if (emitCount % 5000 == 0) {
+            // System.out.println("Average time in micros is " +
+            // averageTimeMicros);
+        }
+
+        long sleepTimeMicros = 0;
+        long millis = 0;
+
+        long timeToMeetRateMicros = adjustedExpectedRate * averageTimeMicros;
+        long leftOver = 1000000 - timeToMeetRateMicros;
+        if (leftOver <= 0) {
+            sleepTimeMicros = 0;
+        } else {
+            sleepTimeMicros = (leftOver / adjustedExpectedRate)
+                    - sleepOverheadMicros;
+        }
+
+        // how many events can be processed in the nanos time?
+        int eventsBeforeSleep = 1;
+        if (sleepTimeMicros < 1000) {
+            sleepTimeMicros = 1000 + sleepOverheadMicros;
+            millis = 1;
+            double numNapsDouble = ((double) leftOver / sleepTimeMicros);
+            int numNaps = (int) Math.ceil(numNapsDouble);
+            if (numNaps > 0) {
+                eventsBeforeSleep = adjustedExpectedRate / numNaps;
+            }
+
+            if (leftOver <= 0) {
+                millis = 0;
+                eventsBeforeSleep = 1000;
+            }
+        } else {
+            millis = sleepTimeMicros / 1000;
+        }
+
+        rateInfo[0] = millis;
+        rateInfo[1] = eventsBeforeSleep;
+        return rateInfo;
+    }
+
+	public static class EventTypeInfo {
+        private Schema schema;
+        private String streamName;
+
+        public EventTypeInfo(Schema schema, String streamName) {
+            this.schema = schema;
+            this.streamName = streamName;
+        }
+
+        public Schema getSchema() {
+            return schema;
+        }
+
+        public String getStreamName() {
+            return streamName;
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/util/MethodInvoker.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/util/MethodInvoker.java b/s4-core/src/main/java/org/apache/s4/util/MethodInvoker.java
new file mode 100644
index 0000000..a98c46a
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/util/MethodInvoker.java
@@ -0,0 +1,124 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.util;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Utility class to invoke a method on an arbitrary object, if such a method is
+ * defined.
+ */
+public class MethodInvoker {
+
+    /**
+     * Find and invoke a getter on an object. A getter for parameter N is a
+     * public method whose name equals "get" + N, ignoring case, and which takes
+     * zero arguments. If no such method is found, an exception is thrown.
+     * 
+     * @param obj
+     *            object on which getter id to be invoked
+     * @param name
+     *            parameter name
+     * @return value returned by the getter method, if such a method is found.
+     *         Null if the object is null.
+     * @throws Exception
+     *             if no suitable getter is found, or if the getter method
+     *             throws an exception. The latter is wrapped in an
+     *             {@link InvocationTargetException}
+     */
+    public static Object invokeGetter(Object obj, String name) throws Exception {
+        if (obj != null) {
+            Method getter = findGetter(obj.getClass(), name);
+
+            if (getter != null) {
+                return getter.invoke(obj);
+
+            } else {
+                throw new NoGetterException(obj.getClass(), name);
+            }
+
+        } else {
+            throw new Exception("Null Target");
+        }
+    }
+
+    private static ConcurrentHashMap<Class<?>, HashMap<String, Method>> gettersMap = new ConcurrentHashMap<Class<?>, HashMap<String, Method>>();
+
+    private static Method findGetter(Class<?> clazz, String name) {
+        HashMap<String, Method> getters = gettersMap.get(clazz);
+
+        if (getters == null) {
+            HashMap<String, Method> newGetters = reflectGetters(clazz);
+
+            getters = gettersMap.putIfAbsent(clazz, newGetters);
+
+            if (getters == null)
+                getters = newGetters;
+        }
+
+        return getters.get(name.toLowerCase());
+    }
+
+    private static HashMap<String, Method> reflectGetters(Class<?> clazz) {
+        HashMap<String, Method> map = new HashMap<String, Method>();
+
+        for (Method m : clazz.getMethods()) {
+            // the method we are interested in should be named get* and take no
+            // arguments.
+            String n = m.getName();
+
+            if (m.getParameterTypes().length == 0 && n.startsWith("get")) {
+                String name = n.substring(3).toLowerCase();
+
+                map.put(name, m);
+            }
+        }
+
+        return map;
+    }
+
+    private static Class<?>[] getTypes(Object[] args) {
+        Class<?>[] aT = new Class<?>[args.length];
+        for (int i = 0; i < args.length; ++i) {
+            aT[i] = args[i].getClass();
+        }
+
+        return aT;
+    }
+
+    public static class NoGetterException extends Exception {
+        public NoGetterException(Class<?> clazz, String name) {
+            super("No Getter for attribute " + clazz.getName() + "." + name);
+        }
+    }
+
+    public static class NoMethodException extends Exception {
+        public NoMethodException(Class<?> clazz, String name, Object[] args) {
+            super("No method found " + clazz.getName() + "." + name + "("
+                    + MethodInvoker.getTypes(args) + ")");
+        }
+    }
+
+    public static class NullTargetException extends Exception {
+        public NullTargetException() {
+            super();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/util/MetricsName.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/util/MetricsName.java b/s4-core/src/main/java/org/apache/s4/util/MetricsName.java
new file mode 100644
index 0000000..73762e4
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/util/MetricsName.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.util;
+
+public enum MetricsName {
+    // metrics event name
+    S4_APP_METRICS("S4::S4AppMetrics"), S4_EVENT_METRICS("S4::S4EventMetrics"), S4_CORE_METRICS(
+            "S4::S4CoreMetrics"),
+
+    // metrics name
+    low_level_listener_msg_in_ct("lll_in"), low_level_listener_msg_drop_ct(
+            "lll_dr"), low_level_listener_qsz("lll_qsz"), low_level_listener_badmsg_ct(
+            "lll_bad"), // exception can't be caught
+    generic_listener_msg_in_ct("gl_in"), pecontainer_ev_dq_ct("pec_dq"), pecontainer_ev_nq_ct(
+            "pec_nq"), pecontainer_msg_drop_ct("pec_dr"), pecontainer_qsz(
+            "pec_qsz"), pecontainer_qsz_w("pec_qsz_w"), pecontainer_ev_process_ct(
+            "pec_pr"), pecontainer_pe_ct("pec_pe"), pecontainer_ev_err_ct(
+            "pec_err"), // exception can't be caught
+    pecontainer_exec_elapse_time("pec_exec_t"), low_level_emitter_msg_out_ct(
+            "lle_out"), low_level_emitter_out_err_ct("lle_err"), low_level_emitter_qsz(
+            "lle_qsz"), s4_core_exit_ct("s4_ex_ct"), s4_core_free_mem("s4_fmem"), pe_join_ev_ct(
+            "pe_j_ct"), pe_error_count("pe_err");
+
+    private final String eventShortName;
+
+    private MetricsName(String eventShortName) {
+        this.eventShortName = eventShortName;
+    }
+
+    public String toString() {
+        return eventShortName;
+    }
+
+    public static void main(String[] args) {
+        System.out.println(generic_listener_msg_in_ct.toString());
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/util/MiscConstants.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/util/MiscConstants.java b/s4-core/src/main/java/org/apache/s4/util/MiscConstants.java
new file mode 100644
index 0000000..7a072f0
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/util/MiscConstants.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.util;
+
+public class MiscConstants {
+    public final static String EVENT_WRAPPER_SCHEMA_NAME = "EventWrapper";
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/util/NumberUtils.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/util/NumberUtils.java b/s4-core/src/main/java/org/apache/s4/util/NumberUtils.java
new file mode 100644
index 0000000..708320e
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/util/NumberUtils.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.util;
+
+import java.math.BigInteger;
+
+public class NumberUtils {
+
+    private final static BigInteger B64 = BigInteger.ZERO.setBit(64);
+
+    public static BigInteger getLongAsUnsignedBigInteger(long number) {
+        if (number >= 0)
+            return BigInteger.valueOf(number);
+        return BigInteger.valueOf(number).add(B64);
+    }
+
+    public static String getUnsignedBigIntegerAsHex(BigInteger bi) {
+        String hexString = bi.toString(16);
+        if (hexString.length() < 16) {
+            String zeroes = "000000000000000";
+            hexString = zeroes.substring(0, 16 - hexString.length())
+                    + hexString;
+        }
+        return hexString.toUpperCase();
+    }
+
+    public static void main(String args[]) {
+        BigInteger bi = getLongAsUnsignedBigInteger(0x00f1200000004561L);
+        System.out.println(getUnsignedBigIntegerAsHex(bi));
+
+        bi = getLongAsUnsignedBigInteger(0x80f12000dd004561L);
+        System.out.println(getUnsignedBigIntegerAsHex(bi));
+
+        bi = getLongAsUnsignedBigInteger(0x1161L);
+        System.out.println(getUnsignedBigIntegerAsHex(bi));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/util/PreprodLogger.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/util/PreprodLogger.java b/s4-core/src/main/java/org/apache/s4/util/PreprodLogger.java
new file mode 100644
index 0000000..6b4d33d
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/util/PreprodLogger.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.util;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.text.DecimalFormat;
+
+import org.apache.log4j.Logger;
+
+public class PreprodLogger {
+
+    String filenamePrefix;
+    File file;
+    FileOutputStream fos;
+
+    private DecimalFormat formatter = new DecimalFormat("0000");
+
+    public void setFilenamePrefix(String filenamePrefix) {
+        this.filenamePrefix = filenamePrefix;
+    }
+
+    public PreprodLogger() {
+    }
+
+    public void openNewFile() {
+        for (int count = 1; true; count++) {
+            String countString = formatter.format(count);
+            String filename = filenamePrefix + "." + countString + ".txt";
+            file = new File(filename);
+            if (!file.exists()) {
+                break;
+            }
+        }
+
+        try {
+            fos = new FileOutputStream(file);
+        } catch (IOException ioe) {
+            Logger.getLogger("s4")
+                  .error("Some sort of exception opening event logging file",
+                         ioe);
+            return;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/util/ReverseDoubleOutputFormatter.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/util/ReverseDoubleOutputFormatter.java b/s4-core/src/main/java/org/apache/s4/util/ReverseDoubleOutputFormatter.java
new file mode 100644
index 0000000..06b72f4
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/util/ReverseDoubleOutputFormatter.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.util;
+
+import org.apache.s4.processor.OutputFormatter;
+
+public class ReverseDoubleOutputFormatter implements OutputFormatter {
+
+    @Override
+    public Object format(Object outputValue) {
+        Double doubleObject = (Double) outputValue;
+        return String.valueOf(doubleObject);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/util/ReverseIntegerOutputFormatter.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/util/ReverseIntegerOutputFormatter.java b/s4-core/src/main/java/org/apache/s4/util/ReverseIntegerOutputFormatter.java
new file mode 100644
index 0000000..1ee2954
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/util/ReverseIntegerOutputFormatter.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.util;
+
+import org.apache.s4.processor.OutputFormatter;
+
+public class ReverseIntegerOutputFormatter implements OutputFormatter {
+
+    @Override
+    public Object format(Object outputValue) {
+        Integer integerObject = (Integer) outputValue;
+        return String.valueOf(integerObject);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/util/S4Util.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/util/S4Util.java b/s4-core/src/main/java/org/apache/s4/util/S4Util.java
new file mode 100644
index 0000000..99a2302
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/util/S4Util.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.util;
+
+public class S4Util {
+    public static long getPID() {
+        String processName = java.lang.management.ManagementFactory.getRuntimeMXBean()
+                                                                   .getName();
+        return Long.parseLong(processName.split("@")[0]);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/util/SlotUtils.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/util/SlotUtils.java b/s4-core/src/main/java/org/apache/s4/util/SlotUtils.java
new file mode 100644
index 0000000..9e456c1
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/util/SlotUtils.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.util;
+
+
+public class SlotUtils {
+
+    private int slotSize; // slot size in seconds
+
+    public SlotUtils(int slotSize) {
+        this.slotSize = slotSize;
+    }
+
+    public void setSize(int slotSize) {
+        this.slotSize = slotSize;
+    }
+
+    public Long getSlotAtTime(long time) {
+        return slotSize * (long) Math.floor(time / slotSize);
+    }
+
+    public Long getCurrentSlot() {
+        long currTimeStamp = System.currentTimeMillis() / 1000; // convert to
+        // seconds
+        Long slotTimeStamp = getSlotAtTime(currTimeStamp);
+        return slotTimeStamp;
+    }
+
+    public Long getSlot(int index, long currTimeStamp) {
+        Long slotTimeStamp = getSlotAtTime(currTimeStamp + index * slotSize);
+        return slotTimeStamp;
+    }
+
+    public boolean isOutsideWindow(Long slot, int windowSize, long time) {
+        boolean outside = false;
+        long windowBoundary = getSlotAtTime(time) - windowSize;
+        if (slot.longValue() < windowBoundary) {
+            outside = true;
+        }
+        return outside;
+    }
+
+    public static void main(String[] args) {
+        SlotUtils s = new SlotUtils(300);
+        System.out.printf("%d\n", s.getCurrentSlot());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/util/ToStringOutputFormatter.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/util/ToStringOutputFormatter.java b/s4-core/src/main/java/org/apache/s4/util/ToStringOutputFormatter.java
new file mode 100644
index 0000000..4035709
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/util/ToStringOutputFormatter.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.util;
+
+import org.apache.s4.processor.OutputFormatter;
+
+public class ToStringOutputFormatter implements OutputFormatter {
+
+    @Override
+    public Object format(Object outputValue) {
+        return String.valueOf(outputValue);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/util/Watcher.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/util/Watcher.java b/s4-core/src/main/java/org/apache/s4/util/Watcher.java
new file mode 100644
index 0000000..39a9542
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/util/Watcher.java
@@ -0,0 +1,174 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.util;
+
+import static org.apache.s4.util.MetricsName.S4_CORE_METRICS;
+import static org.apache.s4.util.MetricsName.s4_core_exit_ct;
+import static org.apache.s4.util.MetricsName.s4_core_free_mem;
+import org.apache.s4.logger.Monitor;
+import org.apache.s4.persist.Persister;
+import org.apache.s4.processor.AsynchronousEventProcessor;
+
+import java.io.File;
+import java.text.MessageFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+
+import org.apache.log4j.Logger;
+
+public class Watcher implements Runnable {
+    Runtime rt = Runtime.getRuntime();
+    AsynchronousEventProcessor peContainer;
+    Persister persister;
+    Persister localPersister;
+    String configFilename;
+    long configFileTime = -1;
+    Monitor monitor;
+
+    public void setMonitor(Monitor monitor) {
+        this.monitor = monitor;
+    }
+
+    private long minimumMemory = 200 * 1024 * 1024;
+
+    SimpleDateFormat dateFormatter = new SimpleDateFormat("yyyyMMdd HH:mm:ss");
+
+    public void setMinimumMemory(long minimumMemory) {
+        this.minimumMemory = minimumMemory;
+    }
+
+    public void setPeContainer(AsynchronousEventProcessor peContainer) {
+        this.peContainer = peContainer;
+    }
+
+    public void setPersister(Persister persister) {
+        this.persister = persister;
+    }
+
+    public void setLocalPersister(Persister localPersister) {
+        this.localPersister = localPersister;
+    }
+
+    public void setConfigFilename(String configFilename) {
+        this.configFilename = configFilename;
+    }
+
+    public Watcher() {
+
+    }
+
+    public void init() {
+        Thread t = new Thread(this);
+        t.start();
+    }
+
+    public void run() {
+        try {
+            while (true) {
+                String stringTime = dateFormatter.format(new Date());
+
+                String template1 = "{0,number,#######0} waiting processing";
+                Logger.getLogger("s4")
+                      .info(MessageFormat.format(template1,
+                                                 peContainer.getQueueSize()));
+
+                String template2 = "Total: {0,number,#######0}, max {1,number,#######0}, free {2,number,#######0}";
+                Logger.getLogger("s4")
+                      .info(MessageFormat.format(template2,
+                                                 rt.totalMemory(),
+                                                 rt.maxMemory(),
+                                                 rt.freeMemory()));
+                memoryCheck();
+                configCheck();
+                try {
+                    Thread.sleep(15000);
+                } catch (Exception e) {
+                }
+            }
+        } catch (Exception e) {
+            Logger.getLogger("s4")
+                  .warn("Some sort of exception in Watcher thread", e);
+            try {
+                Thread.sleep(30000);
+            } catch (Exception ie) {
+            }
+        }
+    }
+
+    private ArrayList<byte[]> memoryHog = new ArrayList<byte[]>();
+
+    private void memoryCheck() {
+        long total = rt.totalMemory();
+        long max = rt.maxMemory();
+        long free = rt.freeMemory();
+        long actualFree = (max - total) + free;
+
+        try {
+            if (monitor != null) {
+                monitor.set(s4_core_free_mem.toString(),
+                            (int) (actualFree / 1024 / 1024.0),
+                            S4_CORE_METRICS.toString());
+            }
+
+            if (actualFree < minimumMemory) {
+                Logger.getLogger("s4").error("Too little memory remaining: "
+                        + actualFree + ". Exiting so process can be restarted");
+                if (monitor != null) {
+                    monitor.set(s4_core_exit_ct.toString(),
+                                1,
+                                S4_CORE_METRICS.toString());
+                }
+                System.exit(3);
+            }
+        } catch (Exception e) {
+            Logger.getLogger("s4").error("metrics name doesn't exist: ", e);
+        }
+
+        // TODO: Comment this out!!
+        // memoryHog.add(new byte[10*1024*1024]);
+    }
+
+    private void configCheck() {
+        if (configFilename == null) {
+            return;
+        }
+
+        File file = new File(configFilename);
+        if (!file.exists()) {
+            return;
+        }
+        long lastModified = file.lastModified();
+        if (configFileTime == -1) {
+            configFileTime = lastModified;
+            return;
+        }
+
+        if (lastModified > configFileTime) {
+            Logger.getLogger("s4").info("Config file has changed. Exiting!!");
+            try {
+                if (monitor != null) {
+                    monitor.set(s4_core_exit_ct.toString(),
+                                1,
+                                S4_CORE_METRICS.toString());
+                }
+            } catch (Exception e) {
+                Logger.getLogger("s4").error("metrics name doesn't exist: ", e);
+            }
+            System.exit(4);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/util/clock/Clock.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/util/clock/Clock.java b/s4-core/src/main/java/org/apache/s4/util/clock/Clock.java
new file mode 100644
index 0000000..cf9ffce
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/util/clock/Clock.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+
+package org.apache.s4.util.clock;
+
+public interface Clock {
+
+    public long waitForTime(long targetTime);
+
+    public long getCurrentTime();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/util/clock/ClockStreamsLoader.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/util/clock/ClockStreamsLoader.java b/s4-core/src/main/java/org/apache/s4/util/clock/ClockStreamsLoader.java
new file mode 100644
index 0000000..542405d
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/util/clock/ClockStreamsLoader.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *          http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+
+package org.apache.s4.util.clock;
+
+
+import java.util.HashMap;
+
+import org.apache.log4j.Logger;
+
+public class ClockStreamsLoader {
+
+    private static Logger logger = Logger.getLogger(ClockStreamsLoader.class);
+    Clock s4Clock;
+    HashMap<String, String> streamFieldMap;
+
+    public void setStreamFieldMap(HashMap<String, String> streamFieldMap) {
+        this.streamFieldMap = streamFieldMap;
+    }
+
+    public void setS4Clock(Clock s4Clock) {
+        this.s4Clock = s4Clock;
+    }
+
+    public void addStreams() {
+        if (s4Clock instanceof EventClock) {
+            EventClock eventClock = (EventClock) s4Clock;
+            for (String streamName : streamFieldMap.keySet()) {
+                eventClock.addEventClockStream(streamName,
+                        streamFieldMap.get(streamName));
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/util/clock/DrivenClock.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/util/clock/DrivenClock.java b/s4-core/src/main/java/org/apache/s4/util/clock/DrivenClock.java
new file mode 100644
index 0000000..d30469a
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/util/clock/DrivenClock.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+
+package org.apache.s4.util.clock;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+public class DrivenClock implements Clock {
+    
+    private volatile long currentTime;
+    private NavigableMap<Long, List<TimerRequest>> timerRequests = new TreeMap<Long, List<TimerRequest>>();
+
+    public void updateTime(long newCurrentTime) {
+        if (newCurrentTime < currentTime) {
+            return;
+        }
+        List<TimerRequest> relevantRequests = null;
+        synchronized (timerRequests) {
+            currentTime = newCurrentTime;
+            while (true) {
+                // inspect the top of the timer request list and see if any
+                // request
+                // is
+                // satisfied by the new current time
+                Entry<Long, List<TimerRequest>> entry = timerRequests
+                        .firstEntry();
+                if (entry == null || entry.getKey() > newCurrentTime) {
+                    break;
+                }
+                relevantRequests = timerRequests.remove(entry.getKey());
+            }
+            if (relevantRequests != null) {
+                for (TimerRequest timerRequest : relevantRequests) {
+                    timerRequest.wakeUp(newCurrentTime);
+                }
+            }
+        }
+    }
+
+    public long waitForTime(long targetTime) {
+        TimerRequest timerRequest = null;
+        synchronized (timerRequests) {
+            if (targetTime <= currentTime) {
+                return currentTime;
+            }
+            timerRequest = new TimerRequest(targetTime);
+            List<TimerRequest> requestsForTargetTime = timerRequests.get(targetTime);
+            if (requestsForTargetTime == null) {
+                requestsForTargetTime = new ArrayList<TimerRequest>();
+                timerRequests.put(targetTime, requestsForTargetTime);
+            }
+            requestsForTargetTime.add(timerRequest);
+        }
+        return timerRequest.waitForTargetTime();
+    }
+
+
+    public long getCurrentTime() {
+        return getCurrentTime(true);
+    }
+
+    public long getCurrentTime(boolean waitOnInitialization) {
+        if (currentTime == 0 && waitOnInitialization) {
+            // if tick has never been called, wait for it to be called once
+            this.waitForTime(1);
+        }
+        return currentTime;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/util/clock/EventClock.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/util/clock/EventClock.java b/s4-core/src/main/java/org/apache/s4/util/clock/EventClock.java
new file mode 100644
index 0000000..96fcd86
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/util/clock/EventClock.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+
+package org.apache.s4.util.clock;
+
+import org.apache.s4.collector.EventWrapper;
+import org.apache.s4.schema.Schema;
+import org.apache.s4.schema.Schema.Property;
+import org.apache.s4.schema.SchemaContainer;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+public class EventClock extends DrivenClock {
+
+    private static Logger logger = Logger.getLogger(EventClock.class);
+
+    Map<String, String> eventClockStreamsMap = new HashMap<String, String>();
+    SchemaContainer schemaContainer = new SchemaContainer();
+
+    public void update(EventWrapper eventWrapper) {
+        long eventTime = -1;
+        String streamName = eventWrapper.getStreamName();
+        String fieldName = eventClockStreamsMap.get(streamName);
+        if (fieldName != null) {
+            Object event = eventWrapper.getEvent();
+            Schema schema = schemaContainer.getSchema(event.getClass());
+            Property property = schema.getProperties().get(fieldName);
+            if (property != null
+                    && (property.getType().equals(Long.TYPE) || property
+                            .getType().equals(Long.class))) {
+                try {
+                    eventTime = (Long) property.getGetterMethod().invoke(event);
+                    updateTime(eventTime);
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        } 
+    }
+
+    public void addEventClockStream(String streamName, String fieldName) {
+        String fieldNameInStream = eventClockStreamsMap.get(streamName);
+        if (fieldNameInStream != null) {
+            if (!fieldNameInStream.equals(fieldName)) {
+                // we can add an runtime exception over error messages for
+                // making debugging easy
+                logger.error("Stream " + streamName
+                        + " already has a timestamp field defined "
+                        + eventClockStreamsMap.get(streamName));
+                logger.error("Stream " + streamName
+                        + " is updating the timestamp field to " + fieldName);
+                eventClockStreamsMap.put(streamName, fieldName);
+            }
+        } else {
+            eventClockStreamsMap.put(streamName, fieldName);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/util/clock/TimerRequest.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/util/clock/TimerRequest.java b/s4-core/src/main/java/org/apache/s4/util/clock/TimerRequest.java
new file mode 100644
index 0000000..0666b69
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/util/clock/TimerRequest.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+
+package org.apache.s4.util.clock;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class TimerRequest {
+    private long targetTime;
+    private BlockingQueue<Long> blockingQueue = new LinkedBlockingQueue<Long>();
+
+    public TimerRequest(long targetTime) {
+        this.targetTime = targetTime;
+    }
+
+    public long getTargetTime() {
+        return targetTime;
+    }
+
+    public void wakeUp(long currentTime) {
+        blockingQueue.add(currentTime);
+    }
+
+    public long waitForTargetTime() {
+        try {
+            return blockingQueue.take();
+        } catch (InterruptedException ie) {
+            return -1;
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/util/clock/WallClock.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/util/clock/WallClock.java b/s4-core/src/main/java/org/apache/s4/util/clock/WallClock.java
new file mode 100644
index 0000000..829798e
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/util/clock/WallClock.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+
+package org.apache.s4.util.clock;
+
+
+public class WallClock implements Clock {
+
+    @Override
+    public long waitForTime(long targetTime) {
+        long interval = (targetTime - getCurrentTime());
+        if (interval > 0) {
+            try {
+                Thread.sleep(interval);
+            } catch (InterruptedException ie) {
+                Thread.currentThread().interrupt();
+            }
+        }
+        return getCurrentTime();
+    }
+
+    @Override
+    public long getCurrentTime() {
+        // TODO Auto-generated method stub
+        return System.currentTimeMillis();
+    }
+
+}