You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/01/03 11:19:16 UTC
[40/50] [abbrv] Rename packages in preparation for move to Apache
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/util/LoadGenerator.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/util/LoadGenerator.java b/s4-core/src/main/java/org/apache/s4/util/LoadGenerator.java
new file mode 100644
index 0000000..b94f086
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/util/LoadGenerator.java
@@ -0,0 +1,640 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.util;
+
+import org.apache.s4.collector.EventWrapper;
+import org.apache.s4.dispatcher.partitioner.CompoundKeyInfo;
+import org.apache.s4.emitter.CommLayerEmitter;
+import org.apache.s4.emitter.EventEmitter;
+import org.apache.s4.schema.Schema;
+import org.apache.s4.schema.Schema.Property;
+import org.apache.s4.serialize.KryoSerDeser;
+import org.apache.s4.serialize.SerializerDeserializer;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.lang.reflect.Array;
+import java.lang.reflect.Method;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+public class LoadGenerator {
+
+ public static void main(String args[]) {
+ Options options = new Options();
+ boolean warmUp = false;
+
+ options.addOption(OptionBuilder.withArgName("rate")
+ .hasArg()
+ .withDescription("Rate (events per second)")
+ .create("r"));
+
+ options.addOption(OptionBuilder.withArgName("display_rate")
+ .hasArg()
+ .withDescription("Display Rate at specified second boundary")
+ .create("d"));
+
+ options.addOption(OptionBuilder.withArgName("start_boundary")
+ .hasArg()
+ .withDescription("Start boundary in seconds")
+ .create("b"));
+
+ options.addOption(OptionBuilder.withArgName("run_for")
+ .hasArg()
+ .withDescription("Run for a specified number of seconds")
+ .create("x"));
+
+ options.addOption(OptionBuilder.withArgName("cluster_manager")
+ .hasArg()
+ .withDescription("Cluster manager")
+ .create("z"));
+
+ options.addOption(OptionBuilder.withArgName("sender_application_name")
+ .hasArg()
+ .withDescription("Sender application name")
+ .create("a"));
+
+ options.addOption(OptionBuilder.withArgName("listener_application_name")
+ .hasArg()
+ .withDescription("Listener application name")
+ .create("g"));
+
+ options.addOption(OptionBuilder.withArgName("sleep_overhead")
+ .hasArg()
+ .withDescription("Sleep overhead")
+ .create("o"));
+
+ options.addOption(new Option("w", "Warm-up"));
+
+ CommandLineParser parser = new GnuParser();
+
+ CommandLine line = null;
+ try {
+ // parse the command line arguments
+ line = parser.parse(options, args);
+ } catch (ParseException exp) {
+ // oops, something went wrong
+ System.err.println("Parsing failed. Reason: " + exp.getMessage());
+ System.exit(1);
+ }
+
+ int expectedRate = 250;
+ if (line.hasOption("r")) {
+ try {
+ expectedRate = Integer.parseInt(line.getOptionValue("r"));
+ } catch (Exception e) {
+ System.err.println("Bad expected rate specified "
+ + line.getOptionValue("r"));
+ System.exit(1);
+ }
+ }
+
+ int displayRateIntervalSeconds = 20;
+ if (line.hasOption("d")) {
+ try {
+ displayRateIntervalSeconds = Integer.parseInt(line.getOptionValue("d"));
+ } catch (Exception e) {
+ System.err.println("Bad display rate value specified "
+ + line.getOptionValue("d"));
+ System.exit(1);
+ }
+ }
+
+ int startBoundary = 2;
+ if (line.hasOption("b")) {
+ try {
+ startBoundary = Integer.parseInt(line.getOptionValue("b"));
+ } catch (Exception e) {
+ System.err.println("Bad start boundary value specified "
+ + line.getOptionValue("b"));
+ System.exit(1);
+ }
+ }
+
+ int updateFrequency = 0;
+ if (line.hasOption("f")) {
+ try {
+ updateFrequency = Integer.parseInt(line.getOptionValue("f"));
+ } catch (Exception e) {
+ System.err.println("Bad query udpdate frequency specified "
+ + line.getOptionValue("f"));
+ System.exit(1);
+ }
+ System.out.printf("Update frequency is %d\n", updateFrequency);
+ }
+
+ int runForTime = 0;
+ if (line.hasOption("x")) {
+ try {
+ runForTime = Integer.parseInt(line.getOptionValue("x"));
+ } catch (Exception e) {
+ System.err.println("Bad run for time specified "
+ + line.getOptionValue("x"));
+ System.exit(1);
+ }
+ System.out.printf("Run for time is %d\n", runForTime);
+ }
+
+ String clusterManagerAddress = null;
+ if (line.hasOption("z")) {
+ clusterManagerAddress = line.getOptionValue("z");
+ }
+
+ String senderApplicationName = null;
+ if (line.hasOption("a")) {
+ senderApplicationName = line.getOptionValue("a");
+ }
+
+ String listenerApplicationName = null;
+ if (line.hasOption("a")) {
+ listenerApplicationName = line.getOptionValue("g");
+ }
+
+ if (listenerApplicationName == null) {
+ listenerApplicationName = senderApplicationName;
+ }
+
+ long sleepOverheadMicros = -1;
+ if (line.hasOption("o")) {
+ try {
+ sleepOverheadMicros = Long.parseLong(line.getOptionValue("o"));
+ } catch (NumberFormatException e) {
+ System.err.println("Bad sleep overhead specified "
+ + line.getOptionValue("o"));
+ System.exit(1);
+ }
+ System.out.printf("Specified sleep overhead is %d\n",
+ sleepOverheadMicros);
+ }
+
+ if (line.hasOption("w")) {
+ warmUp = true;
+ }
+
+ List loArgs = line.getArgList();
+ if (loArgs.size() < 1) {
+ System.err.println("No input file specified");
+ System.exit(1);
+ }
+
+ String inputFilename = (String) loArgs.get(0);
+
+ EventEmitter emitter = null;
+
+ SerializerDeserializer serDeser = new KryoSerDeser();
+
+ CommLayerEmitter clEmitter = new CommLayerEmitter();
+ clEmitter.setAppName(senderApplicationName);
+ clEmitter.setListenerAppName(listenerApplicationName);
+ clEmitter.setClusterManagerAddress(clusterManagerAddress);
+ clEmitter.setSenderId(String.valueOf(System.currentTimeMillis() / 1000));
+ clEmitter.setSerDeser(serDeser);
+ clEmitter.init();
+ emitter = clEmitter;
+
+ long endTime = 0;
+ if (runForTime > 0) {
+ endTime = System.currentTimeMillis() + (runForTime * 1000);
+ }
+
+ LoadGenerator loadGenerator = new LoadGenerator();
+ loadGenerator.setInputFilename(inputFilename);
+ loadGenerator.setEventEmitter(clEmitter);
+ loadGenerator.setDisplayRateInterval(displayRateIntervalSeconds);
+ loadGenerator.setExpectedRate(expectedRate);
+ loadGenerator.run();
+
+ System.exit(0);
+ }
+
+ private EventEmitter eventEmitter;
+ private String inputFilename;
+ private int emitCount;
+ private int displayRateInterval = 0;
+
+ private int expectedRate = 200;
+ private int adjustedExpectedRate = 1;
+ private long sleepOverheadMicros = -1;
+ private static int PROCESS_TIME_LIST_MAX_SIZE = 15;
+ private long[] processTimes = new long[PROCESS_TIME_LIST_MAX_SIZE];
+ private int processTimePointer = 0;
+ private Map<Integer, EventTypeInfo> eventTypeInfoMap = new HashMap<Integer, EventTypeInfo>();
+
+ public int getEmitCount() {
+ return emitCount;
+ }
+
+ public void setEventEmitter(EventEmitter eventEmitter) {
+ this.eventEmitter = eventEmitter;
+ }
+
+ public void setInputFilename(String inputFilename) {
+ this.inputFilename = inputFilename;
+ }
+
+ public void setDisplayRateInterval(int displayRateInterval) {
+ this.displayRateInterval = displayRateInterval;
+ }
+
+ public void setSleepOverheadMicros(long sleepOverheadMicros) {
+ this.sleepOverheadMicros = sleepOverheadMicros;
+ }
+
+ public void setExpectedRate(int expectedRate) {
+ this.expectedRate = expectedRate;
+ }
+
+ private Random rand = new Random(System.currentTimeMillis());
+
+ public LoadGenerator() {
+ if (sleepOverheadMicros == -1) {
+ // calculate sleep overhead
+ long totalSleepOverhead = 0;
+ for (int i = 0; i < 50; i++) {
+ long startTime = System.nanoTime();
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException ie) {
+ }
+ totalSleepOverhead += (System.nanoTime() - startTime)
+ - (1 * 1000 * 1000);
+ }
+ sleepOverheadMicros = (totalSleepOverhead / 50) / 1000;
+ }
+ System.out.println("Sleep overhead is " + sleepOverheadMicros);
+ }
+
+ public void run() {
+ // for now, no warm-up mechanism
+ adjustedExpectedRate = expectedRate;
+
+ long startTime = 0;
+ long intervalStart = 0;
+ int emitCountStart = 0;
+ long[] rateInfo = new long[2];
+ rateInfo[0] = 100; // start with a sleep time of 100
+
+ BufferedReader br = null;
+ Reader inputReader = null;
+ try {
+ if (inputFilename.equals("-")) {
+ inputReader = new InputStreamReader(System.in);
+ } else {
+ inputReader = new FileReader(inputFilename);
+ }
+ br = new BufferedReader(inputReader);
+ String inputLine = null;
+ boolean firstLine = true;
+ EventWrapper eventWrapper = null;
+ for (startTime = System.nanoTime(); (inputLine = br.readLine()) != null; startTime = System.nanoTime()) {
+ if (firstLine) {
+ JSONObject jsonRecord = new JSONObject(inputLine);
+ createEventTypeInfo(jsonRecord);
+ System.out.println(eventTypeInfoMap);
+ if (eventTypeInfoMap.size() == 0) {
+ return;
+ }
+ firstLine = false;
+ continue;
+ }
+
+ try {
+ JSONObject jsonRecord = new JSONObject(inputLine);
+ int classIndex = jsonRecord.getInt("_index");
+ EventTypeInfo eventTypeInfo = eventTypeInfoMap.get(classIndex);
+ if (eventTypeInfo == null) {
+ System.err.printf("Invalid _index value %d\n",
+ classIndex);
+ return;
+ }
+
+ Object event = makeRecord(jsonRecord,
+ eventTypeInfo.getSchema());
+ eventWrapper = new EventWrapper(eventTypeInfo.getStreamName(),
+ event,
+ new ArrayList<CompoundKeyInfo>());
+ // System.out.println(eventWrapper.getStreamName() + ": " +
+ // eventWrapper.getEvent());
+ } catch (Exception e) {
+ e.printStackTrace();
+ System.err.printf("Bad input data %s\n", inputLine);
+ continue;
+ }
+
+ int partition = Math.abs(rand.nextInt())
+ % eventEmitter.getNodeCount();
+
+ eventEmitter.emit(partition, eventWrapper);
+ emitCount++;
+
+ // the rest of the stuff in this block is just to maintain the
+ // rate
+ processTimes[processTimePointer] = System.nanoTime()
+ - startTime;
+ processTimePointer = (processTimePointer == PROCESS_TIME_LIST_MAX_SIZE - 1) ? 0
+ : processTimePointer + 1;
+ if (emitCount == 1 || emitCount % 20 == 0) {
+ rateInfo = getRateInfo(rateInfo);
+ }
+
+ // if it's time, display the actual emit rate
+ if (intervalStart == 0) {
+ intervalStart = System.currentTimeMillis();
+ } else {
+ long interval = System.currentTimeMillis() - intervalStart;
+ if (interval >= (displayRateInterval * 1000)) {
+ double rate = (emitCount - emitCountStart)
+ / (interval / 1000.0);
+ System.out.println("Rate is " + rate);
+ intervalStart = System.currentTimeMillis();
+ emitCountStart = emitCount;
+ }
+ }
+
+ if (rateInfo[1] == 0 || emitCount % rateInfo[1] == 0) {
+ try {
+ Thread.sleep(rateInfo[0]);
+ } catch (InterruptedException ie) {
+ }
+ }
+ }
+ System.out.printf("Emitted %d events\n", emitCount);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ try {
+ br.close();
+ } catch (Exception e) {
+ }
+ try {
+ inputReader.close();
+ } catch (Exception e) {
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public void createEventTypeInfo(JSONObject classInfo) {
+ String className = "";
+ try {
+ for (Iterator it = classInfo.keys(); it.hasNext();) {
+ className = (String) it.next();
+ JSONObject jsonEventTypeInfo = classInfo.getJSONObject(className);
+ int classIndex = (Integer) jsonEventTypeInfo.getInt("classIndex");
+ String streamName = jsonEventTypeInfo.getString("streamName");
+
+ Class clazz = Class.forName(className);
+ Schema schema = new Schema(clazz);
+ eventTypeInfoMap.put(classIndex, new EventTypeInfo(schema,
+ streamName));
+ }
+ } catch (JSONException je) {
+ je.printStackTrace();
+ } catch (ClassNotFoundException cnfe) {
+ System.err.println("Count not locate class " + className);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public static Object makeRecord(JSONObject jsonRecord, Schema schema) {
+ Object event = null;
+ try {
+ event = schema.getType().newInstance();
+
+ for (Iterator it = jsonRecord.keys(); it.hasNext();) {
+ String propertyName = (String) it.next();
+
+ Property property = schema.getProperties().get(propertyName);
+
+ if (property == null) {
+ continue; // not in schema, just continue
+ }
+
+ Method setterMethod = property.getSetterMethod();
+ Object value = jsonRecord.get(propertyName);
+ if (value.equals(JSONObject.NULL)) {
+ continue;
+ }
+
+ setterMethod.invoke(event, makeSettableValue(property, value));
+
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ return event;
+ }
+
+ @SuppressWarnings("unchecked")
+ public static Object makeSettableValue(Property property, Object value) {
+ String propertyName = property.getName();
+ Class propertyType = property.getType();
+
+ if (propertyType.isArray()) {
+ if (!(value instanceof JSONArray)) {
+ System.err.println("Type mismatch for field " + propertyName);
+ return null;
+ }
+ System.out.println("Is array!");
+ return makeArray(property, (JSONArray) value);
+ } else if (property.isList()) {
+ if (!(value instanceof JSONArray)) {
+ System.err.println("Type mismatch for field " + propertyName);
+ return null;
+ }
+ return makeList(property, (JSONArray) value);
+ } else if (propertyType.isPrimitive()) {
+ if (!(value instanceof Number || value instanceof Boolean)) {
+ System.err.println("Type mismatch for field " + propertyName
+ + "; expected number or boolean, found "
+ + value.getClass());
+ return null;
+ }
+ return value; // hmm... does this work?
+ } else if (propertyType.equals(String.class)) {
+ if (!(value instanceof String)) {
+ System.err.println("Type mismatch for field " + propertyName
+ + "; expected String, found " + value.getClass());
+ return null;
+ }
+ return value;
+ } else if (property.isNumber()) {
+ if (!(value instanceof Integer || value instanceof Long
+ || value instanceof Float || value instanceof Double
+ || value instanceof BigDecimal || value instanceof BigInteger)) {
+ return null;
+ }
+
+ Number adjustedValue = (Number) value;
+ if (propertyType.equals(Long.class) && !(value instanceof Long)) {
+ adjustedValue = new Long(((Number) value).longValue());
+ } else if (propertyType.equals(Integer.class)
+ && !(value instanceof Integer)) {
+ adjustedValue = new Integer(((Number) value).intValue());
+ } else if (propertyType.equals(Double.class)
+ && !(value instanceof Double)) {
+ adjustedValue = new Double(((Number) value).doubleValue());
+ } else if (propertyType.equals(Float.class)
+ && !(value instanceof Float)) {
+ adjustedValue = new Float(((Number) value).floatValue());
+ } else if (propertyType.equals(BigDecimal.class)) {
+ adjustedValue = new BigDecimal(((Number) value).longValue());
+ } else if (propertyType.equals(BigInteger.class)) {
+ adjustedValue = BigInteger.valueOf(((Number) value).longValue());
+ }
+ return adjustedValue;
+ } else if (value instanceof JSONObject) {
+ return makeRecord((JSONObject) value, property.getSchema());
+ }
+
+ return null;
+ }
+
+ public static Object makeList(Property property, JSONArray jsonArray) {
+ Property componentProperty = property.getComponentProperty();
+
+ int size = jsonArray.length();
+
+ List<Object> list = new ArrayList<Object>(size);
+
+ try {
+ for (int i = 0; i < size; i++) {
+ Object value = jsonArray.get(i);
+ list.add(makeSettableValue(componentProperty, value));
+ }
+ } catch (JSONException je) {
+ throw new RuntimeException(je);
+ }
+
+ return list;
+ }
+
+ @SuppressWarnings("unchecked")
+ public static Object makeArray(Property property, JSONArray jsonArray) {
+ Property componentProperty = property.getComponentProperty();
+ Class clazz = componentProperty.getType();
+
+ int size = jsonArray.length();
+
+ Object array = Array.newInstance(clazz, size);
+
+ try {
+ for (int i = 0; i < size; i++) {
+ Object value = jsonArray.get(i);
+ Object adjustedValue = makeSettableValue(componentProperty,
+ value);
+ Array.set(array, i, adjustedValue);
+ }
+ } catch (JSONException je) {
+ throw new RuntimeException(je);
+ }
+ return array;
+ }
+
+ private long[] getRateInfo(long[] rateInfo) {
+ long totalTimeNanos = 0;
+ int entryCount = 0;
+ for (int i = 0; i < processTimes.length; i++) {
+ if (processTimes[i] == Long.MIN_VALUE) {
+ break;
+ }
+ entryCount++;
+ totalTimeNanos += processTimes[i];
+ }
+ long averageTimeMicros = (long) ((totalTimeNanos / (double) entryCount) / 1000.0);
+ // fudge the time for additional overhead
+ averageTimeMicros += (long) (averageTimeMicros * 0.30);
+
+ if (emitCount % 5000 == 0) {
+ // System.out.println("Average time in micros is " +
+ // averageTimeMicros);
+ }
+
+ long sleepTimeMicros = 0;
+ long millis = 0;
+
+ long timeToMeetRateMicros = adjustedExpectedRate * averageTimeMicros;
+ long leftOver = 1000000 - timeToMeetRateMicros;
+ if (leftOver <= 0) {
+ sleepTimeMicros = 0;
+ } else {
+ sleepTimeMicros = (leftOver / adjustedExpectedRate)
+ - sleepOverheadMicros;
+ }
+
+ // how many events can be processed in the nanos time?
+ int eventsBeforeSleep = 1;
+ if (sleepTimeMicros < 1000) {
+ sleepTimeMicros = 1000 + sleepOverheadMicros;
+ millis = 1;
+ double numNapsDouble = ((double) leftOver / sleepTimeMicros);
+ int numNaps = (int) Math.ceil(numNapsDouble);
+ if (numNaps > 0) {
+ eventsBeforeSleep = adjustedExpectedRate / numNaps;
+ }
+
+ if (leftOver <= 0) {
+ millis = 0;
+ eventsBeforeSleep = 1000;
+ }
+ } else {
+ millis = sleepTimeMicros / 1000;
+ }
+
+ rateInfo[0] = millis;
+ rateInfo[1] = eventsBeforeSleep;
+ return rateInfo;
+ }
+
+ public static class EventTypeInfo {
+ private Schema schema;
+ private String streamName;
+
+ public EventTypeInfo(Schema schema, String streamName) {
+ this.schema = schema;
+ this.streamName = streamName;
+ }
+
+ public Schema getSchema() {
+ return schema;
+ }
+
+ public String getStreamName() {
+ return streamName;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/util/MethodInvoker.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/util/MethodInvoker.java b/s4-core/src/main/java/org/apache/s4/util/MethodInvoker.java
new file mode 100644
index 0000000..a98c46a
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/util/MethodInvoker.java
@@ -0,0 +1,124 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.util;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Utility class to invoke a method on an arbitrary object, if such a method is
+ * defined.
+ */
+public class MethodInvoker {
+
+ /**
+ * Find and invoke a getter on an object. A getter for parameter N is a
+ * public method whose name equals "get" + N, ignoring case, and which takes
+ * zero arguments. If no such method is found, an exception is thrown.
+ *
+ * @param obj
+ * object on which getter id to be invoked
+ * @param name
+ * parameter name
+ * @return value returned by the getter method, if such a method is found.
+ * Null if the object is null.
+ * @throws Exception
+ * if no suitable getter is found, or if the getter method
+ * throws an exception. The latter is wrapped in an
+ * {@link InvocationTargetException}
+ */
+ public static Object invokeGetter(Object obj, String name) throws Exception {
+ if (obj != null) {
+ Method getter = findGetter(obj.getClass(), name);
+
+ if (getter != null) {
+ return getter.invoke(obj);
+
+ } else {
+ throw new NoGetterException(obj.getClass(), name);
+ }
+
+ } else {
+ throw new Exception("Null Target");
+ }
+ }
+
+ private static ConcurrentHashMap<Class<?>, HashMap<String, Method>> gettersMap = new ConcurrentHashMap<Class<?>, HashMap<String, Method>>();
+
+ private static Method findGetter(Class<?> clazz, String name) {
+ HashMap<String, Method> getters = gettersMap.get(clazz);
+
+ if (getters == null) {
+ HashMap<String, Method> newGetters = reflectGetters(clazz);
+
+ getters = gettersMap.putIfAbsent(clazz, newGetters);
+
+ if (getters == null)
+ getters = newGetters;
+ }
+
+ return getters.get(name.toLowerCase());
+ }
+
+ private static HashMap<String, Method> reflectGetters(Class<?> clazz) {
+ HashMap<String, Method> map = new HashMap<String, Method>();
+
+ for (Method m : clazz.getMethods()) {
+ // the method we are interested in should be named get* and take no
+ // arguments.
+ String n = m.getName();
+
+ if (m.getParameterTypes().length == 0 && n.startsWith("get")) {
+ String name = n.substring(3).toLowerCase();
+
+ map.put(name, m);
+ }
+ }
+
+ return map;
+ }
+
+ private static Class<?>[] getTypes(Object[] args) {
+ Class<?>[] aT = new Class<?>[args.length];
+ for (int i = 0; i < args.length; ++i) {
+ aT[i] = args[i].getClass();
+ }
+
+ return aT;
+ }
+
+ public static class NoGetterException extends Exception {
+ public NoGetterException(Class<?> clazz, String name) {
+ super("No Getter for attribute " + clazz.getName() + "." + name);
+ }
+ }
+
+ public static class NoMethodException extends Exception {
+ public NoMethodException(Class<?> clazz, String name, Object[] args) {
+ super("No method found " + clazz.getName() + "." + name + "("
+ + MethodInvoker.getTypes(args) + ")");
+ }
+ }
+
+ public static class NullTargetException extends Exception {
+ public NullTargetException() {
+ super();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/util/MetricsName.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/util/MetricsName.java b/s4-core/src/main/java/org/apache/s4/util/MetricsName.java
new file mode 100644
index 0000000..73762e4
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/util/MetricsName.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.util;
+
+public enum MetricsName {
+ // metrics event name
+ S4_APP_METRICS("S4::S4AppMetrics"), S4_EVENT_METRICS("S4::S4EventMetrics"), S4_CORE_METRICS(
+ "S4::S4CoreMetrics"),
+
+ // metrics name
+ low_level_listener_msg_in_ct("lll_in"), low_level_listener_msg_drop_ct(
+ "lll_dr"), low_level_listener_qsz("lll_qsz"), low_level_listener_badmsg_ct(
+ "lll_bad"), // exception can't be caught
+ generic_listener_msg_in_ct("gl_in"), pecontainer_ev_dq_ct("pec_dq"), pecontainer_ev_nq_ct(
+ "pec_nq"), pecontainer_msg_drop_ct("pec_dr"), pecontainer_qsz(
+ "pec_qsz"), pecontainer_qsz_w("pec_qsz_w"), pecontainer_ev_process_ct(
+ "pec_pr"), pecontainer_pe_ct("pec_pe"), pecontainer_ev_err_ct(
+ "pec_err"), // exception can't be caught
+ pecontainer_exec_elapse_time("pec_exec_t"), low_level_emitter_msg_out_ct(
+ "lle_out"), low_level_emitter_out_err_ct("lle_err"), low_level_emitter_qsz(
+ "lle_qsz"), s4_core_exit_ct("s4_ex_ct"), s4_core_free_mem("s4_fmem"), pe_join_ev_ct(
+ "pe_j_ct"), pe_error_count("pe_err");
+
+ private final String eventShortName;
+
+ private MetricsName(String eventShortName) {
+ this.eventShortName = eventShortName;
+ }
+
+ public String toString() {
+ return eventShortName;
+ }
+
+ public static void main(String[] args) {
+ System.out.println(generic_listener_msg_in_ct.toString());
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/util/MiscConstants.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/util/MiscConstants.java b/s4-core/src/main/java/org/apache/s4/util/MiscConstants.java
new file mode 100644
index 0000000..7a072f0
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/util/MiscConstants.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.util;
+
+public class MiscConstants {
+ public final static String EVENT_WRAPPER_SCHEMA_NAME = "EventWrapper";
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/util/NumberUtils.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/util/NumberUtils.java b/s4-core/src/main/java/org/apache/s4/util/NumberUtils.java
new file mode 100644
index 0000000..708320e
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/util/NumberUtils.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.util;
+
+import java.math.BigInteger;
+
+public class NumberUtils {
+
+ private final static BigInteger B64 = BigInteger.ZERO.setBit(64);
+
+ public static BigInteger getLongAsUnsignedBigInteger(long number) {
+ if (number >= 0)
+ return BigInteger.valueOf(number);
+ return BigInteger.valueOf(number).add(B64);
+ }
+
+ public static String getUnsignedBigIntegerAsHex(BigInteger bi) {
+ String hexString = bi.toString(16);
+ if (hexString.length() < 16) {
+ String zeroes = "000000000000000";
+ hexString = zeroes.substring(0, 16 - hexString.length())
+ + hexString;
+ }
+ return hexString.toUpperCase();
+ }
+
+ public static void main(String args[]) {
+ BigInteger bi = getLongAsUnsignedBigInteger(0x00f1200000004561L);
+ System.out.println(getUnsignedBigIntegerAsHex(bi));
+
+ bi = getLongAsUnsignedBigInteger(0x80f12000dd004561L);
+ System.out.println(getUnsignedBigIntegerAsHex(bi));
+
+ bi = getLongAsUnsignedBigInteger(0x1161L);
+ System.out.println(getUnsignedBigIntegerAsHex(bi));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/util/PreprodLogger.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/util/PreprodLogger.java b/s4-core/src/main/java/org/apache/s4/util/PreprodLogger.java
new file mode 100644
index 0000000..6b4d33d
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/util/PreprodLogger.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.util;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.text.DecimalFormat;
+
+import org.apache.log4j.Logger;
+
+public class PreprodLogger {
+
+ String filenamePrefix;
+ File file;
+ FileOutputStream fos;
+
+ private DecimalFormat formatter = new DecimalFormat("0000");
+
+ public void setFilenamePrefix(String filenamePrefix) {
+ this.filenamePrefix = filenamePrefix;
+ }
+
+ public PreprodLogger() {
+ }
+
+ public void openNewFile() {
+ for (int count = 1; true; count++) {
+ String countString = formatter.format(count);
+ String filename = filenamePrefix + "." + countString + ".txt";
+ file = new File(filename);
+ if (!file.exists()) {
+ break;
+ }
+ }
+
+ try {
+ fos = new FileOutputStream(file);
+ } catch (IOException ioe) {
+ Logger.getLogger("s4")
+ .error("Some sort of exception opening event logging file",
+ ioe);
+ return;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/util/ReverseDoubleOutputFormatter.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/util/ReverseDoubleOutputFormatter.java b/s4-core/src/main/java/org/apache/s4/util/ReverseDoubleOutputFormatter.java
new file mode 100644
index 0000000..06b72f4
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/util/ReverseDoubleOutputFormatter.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.util;
+
+import org.apache.s4.processor.OutputFormatter;
+
+public class ReverseDoubleOutputFormatter implements OutputFormatter {
+
+ @Override
+ public Object format(Object outputValue) {
+ Double doubleObject = (Double) outputValue;
+ return String.valueOf(doubleObject);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/util/ReverseIntegerOutputFormatter.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/util/ReverseIntegerOutputFormatter.java b/s4-core/src/main/java/org/apache/s4/util/ReverseIntegerOutputFormatter.java
new file mode 100644
index 0000000..1ee2954
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/util/ReverseIntegerOutputFormatter.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.util;
+
+import org.apache.s4.processor.OutputFormatter;
+
+public class ReverseIntegerOutputFormatter implements OutputFormatter {
+
+ @Override
+ public Object format(Object outputValue) {
+ Integer integerObject = (Integer) outputValue;
+ return String.valueOf(integerObject);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/util/S4Util.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/util/S4Util.java b/s4-core/src/main/java/org/apache/s4/util/S4Util.java
new file mode 100644
index 0000000..99a2302
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/util/S4Util.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.util;
+
+public class S4Util {
+ public static long getPID() {
+ String processName = java.lang.management.ManagementFactory.getRuntimeMXBean()
+ .getName();
+ return Long.parseLong(processName.split("@")[0]);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/util/SlotUtils.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/util/SlotUtils.java b/s4-core/src/main/java/org/apache/s4/util/SlotUtils.java
new file mode 100644
index 0000000..9e456c1
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/util/SlotUtils.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.util;
+
+
+public class SlotUtils {
+
+ private int slotSize; // slot size in seconds
+
+ public SlotUtils(int slotSize) {
+ this.slotSize = slotSize;
+ }
+
+ public void setSize(int slotSize) {
+ this.slotSize = slotSize;
+ }
+
+ public Long getSlotAtTime(long time) {
+ return slotSize * (long) Math.floor(time / slotSize);
+ }
+
+ public Long getCurrentSlot() {
+ long currTimeStamp = System.currentTimeMillis() / 1000; // convert to
+ // seconds
+ Long slotTimeStamp = getSlotAtTime(currTimeStamp);
+ return slotTimeStamp;
+ }
+
+ public Long getSlot(int index, long currTimeStamp) {
+ Long slotTimeStamp = getSlotAtTime(currTimeStamp + index * slotSize);
+ return slotTimeStamp;
+ }
+
+ public boolean isOutsideWindow(Long slot, int windowSize, long time) {
+ boolean outside = false;
+ long windowBoundary = getSlotAtTime(time) - windowSize;
+ if (slot.longValue() < windowBoundary) {
+ outside = true;
+ }
+ return outside;
+ }
+
+ public static void main(String[] args) {
+ SlotUtils s = new SlotUtils(300);
+ System.out.printf("%d\n", s.getCurrentSlot());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/util/ToStringOutputFormatter.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/util/ToStringOutputFormatter.java b/s4-core/src/main/java/org/apache/s4/util/ToStringOutputFormatter.java
new file mode 100644
index 0000000..4035709
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/util/ToStringOutputFormatter.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.util;
+
+import org.apache.s4.processor.OutputFormatter;
+
+public class ToStringOutputFormatter implements OutputFormatter {
+
+ @Override
+ public Object format(Object outputValue) {
+ return String.valueOf(outputValue);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/util/Watcher.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/util/Watcher.java b/s4-core/src/main/java/org/apache/s4/util/Watcher.java
new file mode 100644
index 0000000..39a9542
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/util/Watcher.java
@@ -0,0 +1,174 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.util;
+
+import static org.apache.s4.util.MetricsName.S4_CORE_METRICS;
+import static org.apache.s4.util.MetricsName.s4_core_exit_ct;
+import static org.apache.s4.util.MetricsName.s4_core_free_mem;
+import org.apache.s4.logger.Monitor;
+import org.apache.s4.persist.Persister;
+import org.apache.s4.processor.AsynchronousEventProcessor;
+
+import java.io.File;
+import java.text.MessageFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+
+import org.apache.log4j.Logger;
+
+public class Watcher implements Runnable {
+ Runtime rt = Runtime.getRuntime();
+ AsynchronousEventProcessor peContainer;
+ Persister persister;
+ Persister localPersister;
+ String configFilename;
+ long configFileTime = -1;
+ Monitor monitor;
+
+ public void setMonitor(Monitor monitor) {
+ this.monitor = monitor;
+ }
+
+ private long minimumMemory = 200 * 1024 * 1024;
+
+ SimpleDateFormat dateFormatter = new SimpleDateFormat("yyyyMMdd HH:mm:ss");
+
+ public void setMinimumMemory(long minimumMemory) {
+ this.minimumMemory = minimumMemory;
+ }
+
+ public void setPeContainer(AsynchronousEventProcessor peContainer) {
+ this.peContainer = peContainer;
+ }
+
+ public void setPersister(Persister persister) {
+ this.persister = persister;
+ }
+
+ public void setLocalPersister(Persister localPersister) {
+ this.localPersister = localPersister;
+ }
+
+ public void setConfigFilename(String configFilename) {
+ this.configFilename = configFilename;
+ }
+
+ public Watcher() {
+
+ }
+
+ public void init() {
+ Thread t = new Thread(this);
+ t.start();
+ }
+
+ public void run() {
+ try {
+ while (true) {
+ String stringTime = dateFormatter.format(new Date());
+
+ String template1 = "{0,number,#######0} waiting processing";
+ Logger.getLogger("s4")
+ .info(MessageFormat.format(template1,
+ peContainer.getQueueSize()));
+
+ String template2 = "Total: {0,number,#######0}, max {1,number,#######0}, free {2,number,#######0}";
+ Logger.getLogger("s4")
+ .info(MessageFormat.format(template2,
+ rt.totalMemory(),
+ rt.maxMemory(),
+ rt.freeMemory()));
+ memoryCheck();
+ configCheck();
+ try {
+ Thread.sleep(15000);
+ } catch (Exception e) {
+ }
+ }
+ } catch (Exception e) {
+ Logger.getLogger("s4")
+ .warn("Some sort of exception in Watcher thread", e);
+ try {
+ Thread.sleep(30000);
+ } catch (Exception ie) {
+ }
+ }
+ }
+
+ private ArrayList<byte[]> memoryHog = new ArrayList<byte[]>();
+
+ private void memoryCheck() {
+ long total = rt.totalMemory();
+ long max = rt.maxMemory();
+ long free = rt.freeMemory();
+ long actualFree = (max - total) + free;
+
+ try {
+ if (monitor != null) {
+ monitor.set(s4_core_free_mem.toString(),
+ (int) (actualFree / 1024 / 1024.0),
+ S4_CORE_METRICS.toString());
+ }
+
+ if (actualFree < minimumMemory) {
+ Logger.getLogger("s4").error("Too little memory remaining: "
+ + actualFree + ". Exiting so process can be restarted");
+ if (monitor != null) {
+ monitor.set(s4_core_exit_ct.toString(),
+ 1,
+ S4_CORE_METRICS.toString());
+ }
+ System.exit(3);
+ }
+ } catch (Exception e) {
+ Logger.getLogger("s4").error("metrics name doesn't exist: ", e);
+ }
+
+ // TODO: Comment this out!!
+ // memoryHog.add(new byte[10*1024*1024]);
+ }
+
+ private void configCheck() {
+ if (configFilename == null) {
+ return;
+ }
+
+ File file = new File(configFilename);
+ if (!file.exists()) {
+ return;
+ }
+ long lastModified = file.lastModified();
+ if (configFileTime == -1) {
+ configFileTime = lastModified;
+ return;
+ }
+
+ if (lastModified > configFileTime) {
+ Logger.getLogger("s4").info("Config file has changed. Exiting!!");
+ try {
+ if (monitor != null) {
+ monitor.set(s4_core_exit_ct.toString(),
+ 1,
+ S4_CORE_METRICS.toString());
+ }
+ } catch (Exception e) {
+ Logger.getLogger("s4").error("metrics name doesn't exist: ", e);
+ }
+ System.exit(4);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/util/clock/Clock.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/util/clock/Clock.java b/s4-core/src/main/java/org/apache/s4/util/clock/Clock.java
new file mode 100644
index 0000000..cf9ffce
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/util/clock/Clock.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+
+package org.apache.s4.util.clock;
+
+public interface Clock {
+
+ public long waitForTime(long targetTime);
+
+ public long getCurrentTime();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/util/clock/ClockStreamsLoader.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/util/clock/ClockStreamsLoader.java b/s4-core/src/main/java/org/apache/s4/util/clock/ClockStreamsLoader.java
new file mode 100644
index 0000000..542405d
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/util/clock/ClockStreamsLoader.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+
+package org.apache.s4.util.clock;
+
+
+import java.util.HashMap;
+
+import org.apache.log4j.Logger;
+
+public class ClockStreamsLoader {
+
+ private static Logger logger = Logger.getLogger(ClockStreamsLoader.class);
+ Clock s4Clock;
+ HashMap<String, String> streamFieldMap;
+
+ public void setStreamFieldMap(HashMap<String, String> streamFieldMap) {
+ this.streamFieldMap = streamFieldMap;
+ }
+
+ public void setS4Clock(Clock s4Clock) {
+ this.s4Clock = s4Clock;
+ }
+
+ public void addStreams() {
+ if (s4Clock instanceof EventClock) {
+ EventClock eventClock = (EventClock) s4Clock;
+ for (String streamName : streamFieldMap.keySet()) {
+ eventClock.addEventClockStream(streamName,
+ streamFieldMap.get(streamName));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/util/clock/DrivenClock.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/util/clock/DrivenClock.java b/s4-core/src/main/java/org/apache/s4/util/clock/DrivenClock.java
new file mode 100644
index 0000000..d30469a
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/util/clock/DrivenClock.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+
+package org.apache.s4.util.clock;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+public class DrivenClock implements Clock {
+
+ private volatile long currentTime;
+ private NavigableMap<Long, List<TimerRequest>> timerRequests = new TreeMap<Long, List<TimerRequest>>();
+
+ public void updateTime(long newCurrentTime) {
+ if (newCurrentTime < currentTime) {
+ return;
+ }
+ List<TimerRequest> relevantRequests = null;
+ synchronized (timerRequests) {
+ currentTime = newCurrentTime;
+ while (true) {
+ // inspect the top of the timer request list and see if any
+ // request
+ // is
+ // satisfied by the new current time
+ Entry<Long, List<TimerRequest>> entry = timerRequests
+ .firstEntry();
+ if (entry == null || entry.getKey() > newCurrentTime) {
+ break;
+ }
+ relevantRequests = timerRequests.remove(entry.getKey());
+ }
+ if (relevantRequests != null) {
+ for (TimerRequest timerRequest : relevantRequests) {
+ timerRequest.wakeUp(newCurrentTime);
+ }
+ }
+ }
+ }
+
+ public long waitForTime(long targetTime) {
+ TimerRequest timerRequest = null;
+ synchronized (timerRequests) {
+ if (targetTime <= currentTime) {
+ return currentTime;
+ }
+ timerRequest = new TimerRequest(targetTime);
+ List<TimerRequest> requestsForTargetTime = timerRequests.get(targetTime);
+ if (requestsForTargetTime == null) {
+ requestsForTargetTime = new ArrayList<TimerRequest>();
+ timerRequests.put(targetTime, requestsForTargetTime);
+ }
+ requestsForTargetTime.add(timerRequest);
+ }
+ return timerRequest.waitForTargetTime();
+ }
+
+
+ public long getCurrentTime() {
+ return getCurrentTime(true);
+ }
+
+ public long getCurrentTime(boolean waitOnInitialization) {
+ if (currentTime == 0 && waitOnInitialization) {
+ // if tick has never been called, wait for it to be called once
+ this.waitForTime(1);
+ }
+ return currentTime;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/util/clock/EventClock.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/util/clock/EventClock.java b/s4-core/src/main/java/org/apache/s4/util/clock/EventClock.java
new file mode 100644
index 0000000..96fcd86
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/util/clock/EventClock.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+
+package org.apache.s4.util.clock;
+
+import org.apache.s4.collector.EventWrapper;
+import org.apache.s4.schema.Schema;
+import org.apache.s4.schema.Schema.Property;
+import org.apache.s4.schema.SchemaContainer;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+public class EventClock extends DrivenClock {
+
+ private static Logger logger = Logger.getLogger(EventClock.class);
+
+ Map<String, String> eventClockStreamsMap = new HashMap<String, String>();
+ SchemaContainer schemaContainer = new SchemaContainer();
+
+ public void update(EventWrapper eventWrapper) {
+ long eventTime = -1;
+ String streamName = eventWrapper.getStreamName();
+ String fieldName = eventClockStreamsMap.get(streamName);
+ if (fieldName != null) {
+ Object event = eventWrapper.getEvent();
+ Schema schema = schemaContainer.getSchema(event.getClass());
+ Property property = schema.getProperties().get(fieldName);
+ if (property != null
+ && (property.getType().equals(Long.TYPE) || property
+ .getType().equals(Long.class))) {
+ try {
+ eventTime = (Long) property.getGetterMethod().invoke(event);
+ updateTime(eventTime);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
+ public void addEventClockStream(String streamName, String fieldName) {
+ String fieldNameInStream = eventClockStreamsMap.get(streamName);
+ if (fieldNameInStream != null) {
+ if (!fieldNameInStream.equals(fieldName)) {
+ // we can add an runtime exception over error messages for
+ // making debugging easy
+ logger.error("Stream " + streamName
+ + " already has a timestamp field defined "
+ + eventClockStreamsMap.get(streamName));
+ logger.error("Stream " + streamName
+ + " is updating the timestamp field to " + fieldName);
+ eventClockStreamsMap.put(streamName, fieldName);
+ }
+ } else {
+ eventClockStreamsMap.put(streamName, fieldName);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/util/clock/TimerRequest.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/util/clock/TimerRequest.java b/s4-core/src/main/java/org/apache/s4/util/clock/TimerRequest.java
new file mode 100644
index 0000000..0666b69
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/util/clock/TimerRequest.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+
+package org.apache.s4.util.clock;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class TimerRequest {
+ private long targetTime;
+ private BlockingQueue<Long> blockingQueue = new LinkedBlockingQueue<Long>();
+
+ public TimerRequest(long targetTime) {
+ this.targetTime = targetTime;
+ }
+
+ public long getTargetTime() {
+ return targetTime;
+ }
+
+ public void wakeUp(long currentTime) {
+ blockingQueue.add(currentTime);
+ }
+
+ public long waitForTargetTime() {
+ try {
+ return blockingQueue.take();
+ } catch (InterruptedException ie) {
+ return -1;
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/util/clock/WallClock.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/util/clock/WallClock.java b/s4-core/src/main/java/org/apache/s4/util/clock/WallClock.java
new file mode 100644
index 0000000..829798e
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/util/clock/WallClock.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+
+package org.apache.s4.util.clock;
+
+
+public class WallClock implements Clock {
+
+ @Override
+ public long waitForTime(long targetTime) {
+ long interval = (targetTime - getCurrentTime());
+ if (interval > 0) {
+ try {
+ Thread.sleep(interval);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ return getCurrentTime();
+ }
+
+ @Override
+ public long getCurrentTime() {
+ // TODO Auto-generated method stub
+ return System.currentTimeMillis();
+ }
+
+}