You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/03/14 18:21:14 UTC

[1/3] git commit: [HELIX-356] add a tool for grep zk transaction/snapshot logs based on time, rb=16935

Repository: helix
Updated Branches:
  refs/heads/helix-0.6.2-release c6cb2c2c8 -> 6e587b2d1


[HELIX-356] add a tool for grep zk transaction/snapshot logs based on time,rb=16935


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/efc1defd
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/efc1defd
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/efc1defd

Branch: refs/heads/helix-0.6.2-release
Commit: efc1defd0910eb6137318c7e48253ee736265da5
Parents: c6cb2c2
Author: zzhang <zz...@uci.edu>
Authored: Wed Jan 15 17:56:27 2014 -0800
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Thu Mar 13 18:51:44 2014 -0700

----------------------------------------------------------------------
 helix-core/pom.xml                              |   8 +-
 .../java/org/apache/helix/tools/ZkGrep.java     | 641 +++++++++++++++++++
 .../org/apache/helix/tools/ZkLogAnalyzer.java   | 441 -------------
 3 files changed, 645 insertions(+), 445 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/efc1defd/helix-core/pom.xml
----------------------------------------------------------------------
diff --git a/helix-core/pom.xml b/helix-core/pom.xml
index 32ff9f9..cc6b86e 100644
--- a/helix-core/pom.xml
+++ b/helix-core/pom.xml
@@ -211,10 +211,6 @@ under the License.
               <name>start-standalone-zookeeper</name>
             </program>
             <program>
-              <mainClass>org.apache.helix.tools.ZkLogAnalyzer</mainClass>
-              <name>zk-log-analyzer</name>
-            </program>
-            <program>
               <mainClass>org.apache.helix.examples.Quickstart</mainClass>
               <name>quickstart</name>
             </program>
@@ -230,6 +226,10 @@ under the License.
               <mainClass>org.apache.helix.tools.IntegrationTestUtil</mainClass>
               <name>test-util</name>
             </program>
+            <program>
+              <mainClass>org.apache.helix.tools.ZkGrep</mainClass>
+              <name>zkgrep</name>
+            </program>
           </programs>
         </configuration>
       </plugin>

http://git-wip-us.apache.org/repos/asf/helix/blob/efc1defd/helix-core/src/main/java/org/apache/helix/tools/ZkGrep.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ZkGrep.java b/helix-core/src/main/java/org/apache/helix/tools/ZkGrep.java
new file mode 100644
index 0000000..3abf78f
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/ZkGrep.java
@@ -0,0 +1,641 @@
+package org.apache.helix.tools;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.List;
+import java.util.zip.GZIPInputStream;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.OptionGroup;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.io.FileUtils;
+import org.apache.log4j.Logger;
+
+/**
+ * utility for grep zk transaction/snapshot logs
+ * - to grep a pattern by t1 use:
+ * zkgrep --zkCfg zkCfg --by t1 --pattern patterns...
+ * - to grep a pattern between t1 and t2 use:
+ * zkgrep --zkCfg zkCfg --between t1 t2 --pattern patterns...
+ * for example, to find fail-over latency between t1 and t2, use:
+ * 1) zkgrep --zkCfg zkCfg --by t1 --pattern "/{cluster}/LIVEINSTNCES/" | grep {fail-node}
+ * 2) zkgrep --zkCfg zkCfg --between t1 t2 --pattern "closeSession" | grep {fail-node session-id}
+ * 3) zkgrep --zkCfg zkCfg --between t1 t2 --pattern "/{cluster}" | grep "CURRENTSTATES" |
+ * grep "setData" | tail -1
+ * fail-over latency = timestamp difference between 2) and 3)
+ */
+public class ZkGrep {
+  private static Logger LOG = Logger.getLogger(ZkGrep.class);
+
+  private static final String zkCfg = "zkCfg";
+  private static final String pattern = "pattern";
+  private static final String by = "by";
+  private static final String between = "between";
+
+  public static final String log = "log";
+  public static final String snapshot = "snapshot";
+
+  private static final String gzSuffix = ".gz";
+
+  @SuppressWarnings("static-access")
+  private static Options constructCommandLineOptions() {
+    Option zkCfgOption =
+        OptionBuilder.hasArgs(1).isRequired(false).withLongOpt(zkCfg).withArgName("zoo.cfg")
+            .withDescription("provide zoo.cfg").create();
+
+    Option patternOption =
+        OptionBuilder.hasArgs().isRequired(true).withLongOpt(pattern)
+            .withArgName("grep-patterns...").withDescription("provide patterns (required)")
+            .create();
+
+    Option betweenOption =
+        OptionBuilder.hasArgs(2).isRequired(false).withLongOpt(between)
+            .withArgName("t1 t2 (timestamp in ms or yyMMdd_hhmmss_SSS)")
+            .withDescription("grep between t1 and t2").create();
+
+    Option byOption =
+        OptionBuilder.hasArgs(1).isRequired(false).withLongOpt(by)
+            .withArgName("t (timestamp in ms or yyMMdd_hhmmss_SSS)").withDescription("grep by t")
+            .create();
+
+    OptionGroup group = new OptionGroup();
+    group.setRequired(true);
+    group.addOption(betweenOption);
+    group.addOption(byOption);
+
+    Options options = new Options();
+    options.addOption(zkCfgOption);
+    options.addOption(patternOption);
+    options.addOptionGroup(group);
+    return options;
+  }
+
+  /**
+   * get zk transaction log dir and zk snapshot log dir
+   * @param zkCfgFile
+   * @return String[0]: zk-transaction-log-dir, String[1]: zk-snapshot-dir
+   */
+  static String[] getZkDataDirs(String zkCfgFile) {
+    String[] zkDirs = new String[2];
+
+    FileInputStream fis = null;
+    BufferedReader br = null;
+    try {
+      fis = new FileInputStream(zkCfgFile);
+      br = new BufferedReader(new InputStreamReader(fis));
+
+      String line;
+      while ((line = br.readLine()) != null) {
+        String key = "dataDir=";
+        if (line.startsWith(key)) {
+          zkDirs[1] = zkDirs[0] = line.substring(key.length()) + "/version-2";
+        }
+
+        key = "dataLogDir=";
+        if (line.startsWith(key)) {
+          zkDirs[0] = line.substring(key.length()) + "/version-2";
+        }
+      }
+    } catch (Exception e) {
+      LOG.error("exception in read file: " + zkCfgFile, e);
+    } finally {
+      try {
+        if (br != null) {
+          br.close();
+        }
+
+        if (fis != null) {
+          fis.close();
+        }
+
+      } catch (Exception e) {
+        LOG.error("exception in closing file: " + zkCfgFile, e);
+      }
+    }
+
+    return zkDirs;
+  }
+
+  // debug
+  static void printFiles(File[] files) {
+    System.out.println("START print");
+    for (int i = 0; i < files.length; i++) {
+      File file = files[i];
+      System.out.println(file.getName() + ", " + file.lastModified());
+    }
+    System.out.println("END print");
+  }
+
+  /**
+   * get files under dir in order of last modified time
+   * @param dir
+   * @param pattern
+   * @return
+   */
+  static File[] getSortedFiles(String dirPath, final String pattern) {
+    File dir = new File(dirPath);
+    File[] files = dir.listFiles(new FileFilter() {
+
+      @Override
+      public boolean accept(File file) {
+        return file.isFile() && (file.getName().indexOf(pattern) != -1);
+      }
+    });
+
+    Arrays.sort(files, new Comparator<File>() {
+
+      @Override
+      public int compare(File o1, File o2) {
+        int sign = (int) Math.signum(o1.lastModified() - o2.lastModified());
+        return sign;
+      }
+
+    });
+    return files;
+  }
+
+  /**
+   * get value for an attribute in a parsed zk log; e.g.
+   * "time:1384984016778 session:0x14257d1d17e0004 cxid:0x5 zxid:0x46899 type:error err:-101"
+   * given "time" return "1384984016778"
+   * @param line
+   * @param attribute
+   * @return value
+   */
+  static String getAttributeValue(String line, String attribute) {
+    if (line == null) {
+      return null;
+    }
+
+    if (!attribute.endsWith(":")) {
+      attribute = attribute + ":";
+    }
+
+    String[] parts = line.split("\\s");
+    if (parts != null && parts.length > 0) {
+      for (int i = 0; i < parts.length; i++) {
+        if (parts[i].startsWith(attribute)) {
+          String val = parts[i].substring(attribute.length());
+          return val;
+        }
+      }
+    }
+    return null;
+  }
+
+  static long getTimestamp(String line) {
+    String timestamp = getAttributeValue(line, "time");
+    return Long.parseLong(timestamp);
+  }
+
+  /**
+   * parse a time string either in timestamp form or "yyMMdd_hhmmss_SSS" form
+   * @param time
+   * @return timestamp or -1 on error
+   */
+  static long parseTimeString(String time) {
+    try {
+      return Long.parseLong(time);
+    } catch (NumberFormatException e) {
+      try {
+        SimpleDateFormat formatter = new SimpleDateFormat("yyMMdd_hhmmss_SSS");
+        Date date = formatter.parse(time);
+        return date.getTime();
+      } catch (java.text.ParseException ex) {
+        LOG.error("fail to parse time string: " + time, e);
+      }
+    }
+    return -1;
+  }
+
+  public static void grepZkLog(File zkLog, long start, long end, String... patterns) {
+    FileInputStream fis = null;
+    BufferedReader br = null;
+    try {
+      fis = new FileInputStream(zkLog);
+      br = new BufferedReader(new InputStreamReader(fis));
+
+      String line;
+      while ((line = br.readLine()) != null) {
+        try {
+          long timestamp = getTimestamp(line);
+          if (timestamp > end) {
+            break;
+          }
+
+          if (timestamp < start) {
+            continue;
+          }
+
+          boolean match = true;
+          for (String pattern : patterns) {
+            if (line.indexOf(pattern) == -1) {
+              match = false;
+              break;
+            }
+          }
+
+          if (match) {
+            System.out.println(line);
+          }
+
+        } catch (NumberFormatException e) {
+          // ignore
+        }
+      }
+    } catch (Exception e) {
+      LOG.error("exception in grep zk-log: " + zkLog, e);
+    } finally {
+      try {
+        if (br != null) {
+          br.close();
+        }
+
+        if (fis != null) {
+          fis.close();
+        }
+
+      } catch (Exception e) {
+        LOG.error("exception in closing zk-log: " + zkLog, e);
+      }
+    }
+  }
+
+  public static void grepZkLogDir(List<File> parsedZkLogs, long start, long end, String... patterns) {
+    for (File file : parsedZkLogs) {
+      grepZkLog(file, start, end, patterns);
+
+    }
+
+  }
+
+  public static void grepZkSnapshot(File zkSnapshot, String... patterns) {
+    FileInputStream fis = null;
+    BufferedReader br = null;
+    try {
+      fis = new FileInputStream(zkSnapshot);
+      br = new BufferedReader(new InputStreamReader(fis));
+
+      String line;
+      while ((line = br.readLine()) != null) {
+        try {
+          boolean match = true;
+          for (String pattern : patterns) {
+            if (line.indexOf(pattern) == -1) {
+              match = false;
+              break;
+            }
+          }
+
+          if (match) {
+            System.out.println(line);
+          }
+
+        } catch (NumberFormatException e) {
+          // ignore
+        }
+      }
+    } catch (Exception e) {
+      LOG.error("exception in grep zk-snapshot: " + zkSnapshot, e);
+    } finally {
+      try {
+        if (br != null) {
+          br.close();
+        }
+
+        if (fis != null) {
+          fis.close();
+        }
+
+      } catch (Exception e) {
+        LOG.error("exception in closing zk-snapshot: " + zkSnapshot, e);
+      }
+    }
+  }
+
+  /**
+   * guess zoo.cfg dir
+   * @return absolute path to zoo.cfg
+   */
+  static String guessZkCfgDir() {
+    // TODO impl this
+    return null;
+  }
+
+  public static void printUsage(Options cliOptions) {
+    HelpFormatter helpFormatter = new HelpFormatter();
+    helpFormatter.setWidth(1000);
+    helpFormatter.printHelp("java " + ZkGrep.class.getName(), cliOptions);
+  }
+
+  /**
+   * parse zk-transaction-logs between start and end, if not already parsed
+   * @param zkLogDir
+   * @param start
+   * @param end
+   * @return list of parsed zklogs between start and end, in order of last modified timestamp
+   */
+  static List<File> parseZkLogs(String zkLogDir, long start, long end) {
+    File zkParsedDir = new File(String.format("%s/zklog-parsed", System.getProperty("user.home")));
+    File[] zkLogs = getSortedFiles(zkLogDir, log);
+    // printFiles(zkDataFiles);
+    List<File> parsedZkLogs = new ArrayList<File>();
+
+    boolean stop = false;
+    for (File zkLog : zkLogs) {
+      if (stop) {
+        break;
+      }
+
+      if (zkLog.lastModified() < start) {
+        continue;
+      }
+
+      if (zkLog.lastModified() > end) {
+        stop = true;
+      }
+
+      try {
+        File parsedZkLog = new File(zkParsedDir, stripGzSuffix(zkLog.getName()) + ".parsed");
+        if (!parsedZkLog.exists() || parsedZkLog.lastModified() <= zkLog.lastModified()) {
+
+          if (zkLog.getName().endsWith(gzSuffix)) {
+            // copy and gunzip it
+            FileUtils.copyFileToDirectory(zkLog, zkParsedDir);
+            File zkLogGz = new File(zkParsedDir, zkLog.getName());
+            File tmpZkLog = gunzip(zkLogGz);
+
+            // parse gunzip file
+            ZKLogFormatter.main(new String[] {
+                log, tmpZkLog.getAbsolutePath(), parsedZkLog.getAbsolutePath()
+            });
+
+            // delete it
+            zkLogGz.delete();
+            tmpZkLog.delete();
+          } else {
+            // parse it directly
+            ZKLogFormatter.main(new String[] {
+                log, zkLog.getAbsolutePath(), parsedZkLog.getAbsolutePath()
+            });
+          }
+        }
+        parsedZkLogs.add(parsedZkLog);
+      } catch (Exception e) {
+        LOG.error("fail to parse zkLog: " + zkLog, e);
+      }
+    }
+
+    return parsedZkLogs;
+  }
+
+  /**
+   * Strip off a .gz suffix if any
+   * @param filename
+   * @return
+   */
+  static String stripGzSuffix(String filename) {
+    if (filename.endsWith(gzSuffix)) {
+      return filename.substring(0, filename.length() - gzSuffix.length());
+    }
+    return filename;
+  }
+
+  /**
+   * Gunzip a file
+   * @param zipFile
+   * @return
+   */
+  static File gunzip(File zipFile) {
+    File outputFile = new File(stripGzSuffix(zipFile.getAbsolutePath()));
+
+    byte[] buffer = new byte[1024];
+
+    try {
+
+      GZIPInputStream gzis = new GZIPInputStream(new FileInputStream(zipFile));
+      FileOutputStream out = new FileOutputStream(outputFile);
+
+      int len;
+      while ((len = gzis.read(buffer)) > 0) {
+        out.write(buffer, 0, len);
+      }
+
+      gzis.close();
+      out.close();
+
+      return outputFile;
+    } catch (IOException e) {
+      LOG.error("fail to gunzip file: " + zipFile, e);
+    }
+
+    return null;
+  }
+
+  /**
+   * parse the last zk-snapshots by by-time, if not already parsed
+   * @param zkSnapshotDir
+   * @param byTime
+   * @return File array which the first element is the last zk-snapshot by by-time and the second
+   *         element is its parsed file
+   */
+  static File[] parseZkSnapshot(String zkSnapshotDir, long byTime) {
+    File[] retFiles = new File[2];
+    File zkParsedDir = new File(String.format("%s/zklog-parsed", System.getProperty("user.home")));
+    File[] zkSnapshots = getSortedFiles(zkSnapshotDir, snapshot);
+    // printFiles(zkDataFiles);
+    File lastZkSnapshot = null;
+    for (int i = 0; i < zkSnapshots.length; i++) {
+      File zkSnapshot = zkSnapshots[i];
+      if (zkSnapshot.lastModified() >= byTime) {
+        break;
+      }
+      lastZkSnapshot = zkSnapshot;
+      retFiles[0] = lastZkSnapshot;
+    }
+
+    try {
+      File parsedZkSnapshot =
+          new File(zkParsedDir, stripGzSuffix(lastZkSnapshot.getName()) + ".parsed");
+      if (!parsedZkSnapshot.exists()
+          || parsedZkSnapshot.lastModified() <= lastZkSnapshot.lastModified()) {
+
+        if (lastZkSnapshot.getName().endsWith(gzSuffix)) {
+          // copy and gunzip it
+          FileUtils.copyFileToDirectory(lastZkSnapshot, zkParsedDir);
+          File lastZkSnapshotGz = new File(zkParsedDir, lastZkSnapshot.getName());
+          File tmpLastZkSnapshot = gunzip(lastZkSnapshotGz);
+
+          // parse gunzip file
+          ZKLogFormatter.main(new String[] {
+              snapshot, tmpLastZkSnapshot.getAbsolutePath(), parsedZkSnapshot.getAbsolutePath()
+          });
+
+          // delete it
+          lastZkSnapshotGz.delete();
+          tmpLastZkSnapshot.delete();
+        } else {
+          // parse it directly
+          ZKLogFormatter.main(new String[] {
+              snapshot, lastZkSnapshot.getAbsolutePath(), parsedZkSnapshot.getAbsolutePath()
+          });
+        }
+
+      }
+      retFiles[1] = parsedZkSnapshot;
+      return retFiles;
+    } catch (Exception e) {
+      LOG.error("fail to parse zkSnapshot: " + lastZkSnapshot, e);
+    }
+
+    return null;
+  }
+
+  public static void processCommandLineArgs(String[] cliArgs) {
+    CommandLineParser cliParser = new GnuParser();
+    Options cliOptions = constructCommandLineOptions();
+    CommandLine cmd = null;
+
+    try {
+      cmd = cliParser.parse(cliOptions, cliArgs);
+    } catch (ParseException pe) {
+      System.err.println("CommandLineClient: failed to parse command-line options: " + pe);
+      printUsage(cliOptions);
+      System.exit(1);
+    }
+
+    String zkCfgDirValue = null;
+    String zkCfgFile = null;
+
+    if (cmd.hasOption(zkCfg)) {
+      zkCfgDirValue = cmd.getOptionValue(zkCfg);
+    }
+
+    if (zkCfgDirValue == null) {
+      zkCfgDirValue = guessZkCfgDir();
+    }
+
+    if (zkCfgDirValue == null) {
+      LOG.error("couldn't figure out path to zkCfg file");
+      System.exit(1);
+    }
+
+    // get zoo.cfg path from cfg-dir
+    zkCfgFile = zkCfgDirValue;
+    if (!zkCfgFile.endsWith(".cfg")) {
+      // append with default zoo.cfg
+      zkCfgFile = zkCfgFile + "/zoo.cfg";
+    }
+
+    if (!new File(zkCfgFile).exists()) {
+      LOG.error("zoo.cfg file doen't exist: " + zkCfgFile);
+      System.exit(1);
+    }
+
+    String[] patterns = cmd.getOptionValues(pattern);
+
+    String[] zkDataDirs = getZkDataDirs(zkCfgFile);
+
+    // parse zk data files
+    if (zkDataDirs == null || zkDataDirs[0] == null || zkDataDirs[1] == null) {
+      LOG.error("invalid zkCfgDir: " + zkCfgDirValue);
+      System.exit(1);
+    }
+
+    File zkParsedDir = new File(String.format("%s/zklog-parsed", System.getProperty("user.home")));
+    if (!zkParsedDir.exists()) {
+      LOG.info("creating zklog-parsed dir: " + zkParsedDir.getAbsolutePath());
+      zkParsedDir.mkdir();
+    }
+
+    if (cmd.hasOption(between)) {
+      String[] timeStrings = cmd.getOptionValues(between);
+
+      long startTime = parseTimeString(timeStrings[0]);
+      if (startTime == -1) {
+        LOG.error("invalid start time string: " + timeStrings[0]
+            + ", should be either timestamp or yyMMdd_hhmmss_SSS");
+        System.exit(1);
+      }
+
+      long endTime = parseTimeString(timeStrings[1]);
+      if (endTime == -1) {
+        LOG.error("invalid end time string: " + timeStrings[1]
+            + ", should be either timestamp or yyMMdd_hhmmss_SSS");
+        System.exit(1);
+      }
+
+      if (startTime > endTime) {
+        LOG.warn("empty window: " + startTime + " - " + endTime);
+        System.exit(1);
+      }
+      // zkDataDirs[0] is the transaction log dir
+      List<File> parsedZkLogs = parseZkLogs(zkDataDirs[0], startTime, endTime);
+      grepZkLogDir(parsedZkLogs, startTime, endTime, patterns);
+
+    } else if (cmd.hasOption(by)) {
+      String timeString = cmd.getOptionValue(by);
+
+      long byTime = parseTimeString(timeString);
+      if (byTime == -1) {
+        LOG.error("invalid by time string: " + timeString
+            + ", should be either timestamp or yyMMdd_hhmmss_SSS");
+        System.exit(1);
+      }
+
+      // zkDataDirs[1] is the snapshot dir
+      File[] lastZkSnapshot = parseZkSnapshot(zkDataDirs[1], byTime);
+
+      // lastZkSnapshot[1] is the parsed last snapshot by byTime
+      grepZkSnapshot(lastZkSnapshot[1], patterns);
+
+      // need to grep transaction logs between last-modified-time of snapshot and byTime also
+      // lastZkSnapshot[0] is the last snapshot by byTime
+      long startTime = lastZkSnapshot[0].lastModified();
+
+      // zkDataDirs[0] is the transaction log dir
+      List<File> parsedZkLogs = parseZkLogs(zkDataDirs[0], startTime, byTime);
+      grepZkLogDir(parsedZkLogs, startTime, byTime, patterns);
+    }
+  }
+
+  public static void main(String[] args) {
+    processCommandLineArgs(args);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/efc1defd/helix-core/src/main/java/org/apache/helix/tools/ZkLogAnalyzer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ZkLogAnalyzer.java b/helix-core/src/main/java/org/apache/helix/tools/ZkLogAnalyzer.java
deleted file mode 100644
index 11e1b66..0000000
--- a/helix-core/src/main/java/org/apache/helix/tools/ZkLogAnalyzer.java
+++ /dev/null
@@ -1,441 +0,0 @@
-package org.apache.helix.tools;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileFilter;
-import java.io.FileInputStream;
-import java.io.InputStreamReader;
-import java.sql.Timestamp;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
-import org.apache.helix.ZNRecord;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.Message;
-import org.apache.helix.model.Message.MessageState;
-import org.apache.log4j.Logger;
-
-public class ZkLogAnalyzer {
-  private static Logger LOG = Logger.getLogger(ZkLogAnalyzer.class);
-  private static boolean dump = false;;
-  final static ZNRecordSerializer _deserializer = new ZNRecordSerializer();
-
-  static class Stats {
-    int msgSentCount = 0;
-    int msgSentCount_O2S = 0; // Offline to Slave
-    int msgSentCount_S2M = 0; // Slave to Master
-    int msgSentCount_M2S = 0; // Master to Slave
-    int msgDeleteCount = 0;
-    int msgModifyCount = 0;
-    int curStateCreateCount = 0;
-    int curStateUpdateCount = 0;
-    int extViewCreateCount = 0;
-    int extViewUpdateCount = 0;
-  }
-
-  static String getAttributeValue(String line, String attribute) {
-    if (line == null)
-      return null;
-    String[] parts = line.split("\\s");
-    if (parts != null && parts.length > 0) {
-      for (int i = 0; i < parts.length; i++) {
-        if (parts[i].startsWith(attribute)) {
-          String val = parts[i].substring(attribute.length());
-          return val;
-        }
-      }
-    }
-    return null;
-  }
-
-  static String findLastCSUpdateBetween(List<String> csUpdateLines, long start, long end) {
-    long lastCSUpdateTimestamp = Long.MIN_VALUE;
-    String lastCSUpdateLine = null;
-    for (String line : csUpdateLines) {
-      // ZNRecord record = getZNRecord(line);
-      long timestamp = Long.parseLong(getAttributeValue(line, "time:"));
-      if (timestamp >= start && timestamp <= end && timestamp > lastCSUpdateTimestamp) {
-        lastCSUpdateTimestamp = timestamp;
-        lastCSUpdateLine = line;
-      }
-    }
-    assert (lastCSUpdateLine != null) : "No CS update between " + start + " - " + end;
-    return lastCSUpdateLine;
-  }
-
-  static ZNRecord getZNRecord(String line) {
-    ZNRecord record = null;
-    String value = getAttributeValue(line, "data:");
-    if (value != null) {
-      record = (ZNRecord) _deserializer.deserialize(value.getBytes());
-      // if (record == null)
-      // {
-      // System.out.println(line);
-      // }
-    }
-    return record;
-  }
-
-  public static void main(String[] args) throws Exception {
-    if (args.length != 3) {
-      System.err
-          .println("USAGE: ZkLogAnalyzer zkLogDir clusterName testStartTime (yyMMdd_hhmmss_SSS)");
-      System.exit(1);
-    }
-
-    System.out.println("ZkLogAnalyzer called with args: " + Arrays.toString(args));
-    // get create-timestamp of "/" + clusterName
-    // find all zk logs after that create-timestamp and parse them
-    // save parsed log in /tmp/zkLogAnalyzor_zklog.parsed0,1,2...
-
-    String zkLogDir = args[0];
-    String clusterName = args[1];
-    // String zkAddr = args[2];
-    String startTimeStr = args[2];
-    // ZkClient zkClient = new ZkClient(zkAddr);
-    // Stat clusterCreateStat = zkClient.getStat("/" + clusterName);
-    SimpleDateFormat formatter = new SimpleDateFormat("yyMMdd_hhmmss_SSS");
-    Date date = formatter.parse(startTimeStr);
-    long startTimeStamp = date.getTime();
-
-    System.out.println(clusterName + " created at " + date);
-    while (zkLogDir.endsWith("/")) {
-      zkLogDir = zkLogDir.substring(0, zkLogDir.length() - 1);
-    }
-    if (!zkLogDir.endsWith("/version-2")) {
-      zkLogDir = zkLogDir + "/version-2";
-    }
-    File dir = new File(zkLogDir);
-    File[] zkLogs = dir.listFiles(new FileFilter() {
-
-      @Override
-      public boolean accept(File file) {
-        return file.isFile() && (file.getName().indexOf("log") != -1);
-      }
-    });
-
-    // lastModified time -> zkLog
-    TreeMap<Long, String> lastZkLogs = new TreeMap<Long, String>();
-    for (File file : zkLogs) {
-      if (file.lastModified() > startTimeStamp) {
-        lastZkLogs.put(file.lastModified(), file.getAbsolutePath());
-      }
-    }
-
-    List<String> parsedZkLogs = new ArrayList<String>();
-    int i = 0;
-    System.out.println("zk logs last modified later than " + new Timestamp(startTimeStamp));
-    for (Long lastModified : lastZkLogs.keySet()) {
-      String fileName = lastZkLogs.get(lastModified);
-      System.out.println(new Timestamp(lastModified) + ": "
-          + (fileName.substring(fileName.lastIndexOf('/') + 1)));
-
-      String parsedFileName = "zkLogAnalyzor_zklog.parsed" + i;
-      i++;
-      ZKLogFormatter.main(new String[] {
-          "log", fileName, parsedFileName
-      });
-      parsedZkLogs.add(parsedFileName);
-    }
-
-    // sessionId -> create liveInstance line
-    Map<String, String> sessionMap = new HashMap<String, String>();
-
-    // message send lines in time order
-    // List<String> sendMessageLines = new ArrayList<String>();
-
-    // CS update lines in time order
-    List<String> csUpdateLines = new ArrayList<String>();
-
-    String leaderSession = null;
-
-    System.out.println();
-    Stats stats = new Stats();
-    long lastTestStartTimestamp = Long.MAX_VALUE;
-    long controllerStartTime = 0;
-    for (String parsedZkLog : parsedZkLogs) {
-
-      FileInputStream fis = new FileInputStream(parsedZkLog);
-      BufferedReader br = new BufferedReader(new InputStreamReader(fis));
-
-      String inputLine;
-      while ((inputLine = br.readLine()) != null) {
-        String timestamp = getAttributeValue(inputLine, "time:");
-        if (timestamp == null) {
-          continue;
-        }
-        long timestampVal = Long.parseLong(timestamp);
-        if (timestampVal < startTimeStamp) {
-          continue;
-        }
-
-        if (dump == true) {
-          String printLine = inputLine.replaceAll("data:.*", "");
-          printLine =
-              new Timestamp(timestampVal) + " "
-                  + printLine.substring(printLine.indexOf("session:"));
-          System.err.println(printLine);
-        }
-
-        if (inputLine.indexOf("/start_disable") != -1) {
-          dump = true;
-        }
-        if (inputLine.indexOf("/" + clusterName + "/CONFIGS/CLUSTER/verify") != -1) {
-          String type = getAttributeValue(inputLine, "type:");
-          if (type.equals("delete")) {
-            System.out.println(timestamp + ": verify done");
-            System.out.println("lastTestStartTimestamp:" + lastTestStartTimestamp);
-            String lastCSUpdateLine =
-                findLastCSUpdateBetween(csUpdateLines, lastTestStartTimestamp, timestampVal);
-            long lastCSUpdateTimestamp =
-                Long.parseLong(getAttributeValue(lastCSUpdateLine, "time:"));
-            System.out.println("Last CS Update:" + lastCSUpdateTimestamp);
-
-            System.out.println("state transition latency: "
-                + +(lastCSUpdateTimestamp - lastTestStartTimestamp) + "ms");
-
-            System.out.println("state transition latency since controller start: "
-                + +(lastCSUpdateTimestamp - controllerStartTime) + "ms");
-
-            System.out.println("Create MSG\t" + stats.msgSentCount + "\t" + stats.msgSentCount_O2S
-                + "\t" + stats.msgSentCount_S2M + "\t" + stats.msgSentCount_M2S);
-            System.out.println("Modify MSG\t" + stats.msgModifyCount);
-            System.out.println("Delete MSG\t" + stats.msgDeleteCount);
-            System.out.println("Create CS\t" + stats.curStateCreateCount);
-            System.out.println("Update CS\t" + stats.curStateUpdateCount);
-            System.out.println("Create EV\t" + stats.extViewCreateCount);
-            System.out.println("Update EV\t" + stats.extViewUpdateCount);
-
-            System.out.println();
-            stats = new Stats();
-            lastTestStartTimestamp = Long.MAX_VALUE;
-          }
-        } else if (inputLine.indexOf("/" + clusterName + "/LIVEINSTANCES/") != -1) {
-          // cluster startup
-          if (timestampVal < lastTestStartTimestamp) {
-            System.out.println("START cluster. SETTING lastTestStartTimestamp to "
-                + new Timestamp(timestampVal) + "\nline:" + inputLine);
-            lastTestStartTimestamp = timestampVal;
-          }
-
-          ZNRecord record = getZNRecord(inputLine);
-          LiveInstance liveInstance = new LiveInstance(record);
-          String session = getAttributeValue(inputLine, "session:");
-          sessionMap.put(session, inputLine);
-          System.out.println(new Timestamp(Long.parseLong(timestamp)) + ": create LIVEINSTANCE "
-              + liveInstance.getInstanceName());
-        } else if (inputLine.indexOf("closeSession") != -1) {
-          // kill any instance
-          String session = getAttributeValue(inputLine, "session:");
-          if (sessionMap.containsKey(session)) {
-            if (timestampVal < lastTestStartTimestamp) {
-              System.out.println("KILL node. SETTING lastTestStartTimestamp to " + timestampVal
-                  + " line:" + inputLine);
-              lastTestStartTimestamp = timestampVal;
-            }
-            String line = sessionMap.get(session);
-            ZNRecord record = getZNRecord(line);
-            LiveInstance liveInstance = new LiveInstance(record);
-
-            System.out.println(new Timestamp(Long.parseLong(timestamp)) + ": close session "
-                + liveInstance.getInstanceName());
-            dump = true;
-          }
-        } else if (inputLine.indexOf("/" + clusterName + "/CONFIGS/PARTICIPANT") != -1) {
-          // disable a partition
-          String type = getAttributeValue(inputLine, "type:");
-          if (type.equals("setData") && inputLine.indexOf("HELIX_DISABLED_PARTITION") != -1) {
-            if (timestampVal < lastTestStartTimestamp) {
-              System.out.println("DISABLE partition. SETTING lastTestStartTimestamp to "
-                  + timestampVal + " line:" + inputLine);
-              lastTestStartTimestamp = timestampVal;
-            }
-          }
-        } else if (inputLine.indexOf("/" + clusterName + "/CONTROLLER/LEADER") != -1) {
-          // leaderLine = inputLine;
-          ZNRecord record = getZNRecord(inputLine);
-          LiveInstance liveInstance = new LiveInstance(record);
-          String session = getAttributeValue(inputLine, "session:");
-          leaderSession = session;
-          controllerStartTime = Long.parseLong(getAttributeValue(inputLine, "time:"));
-          sessionMap.put(session, inputLine);
-          System.out.println(new Timestamp(Long.parseLong(timestamp)) + ": create LEADER "
-              + liveInstance.getInstanceName());
-        } else if (inputLine.indexOf("/" + clusterName + "/") != -1
-            && inputLine.indexOf("/CURRENTSTATES/") != -1) {
-          String type = getAttributeValue(inputLine, "type:");
-          if (type.equals("create")) {
-            stats.curStateCreateCount++;
-          } else if (type.equals("setData")) {
-            String path = getAttributeValue(inputLine, "path:");
-            csUpdateLines.add(inputLine);
-            stats.curStateUpdateCount++;
-            // getAttributeValue(line, "data");
-            System.out.println("Update currentstate:" + new Timestamp(Long.parseLong(timestamp))
-                + ":" + timestamp + " path:" + path);
-          }
-        } else if (inputLine.indexOf("/" + clusterName + "/EXTERNALVIEW/") != -1) {
-          String session = getAttributeValue(inputLine, "session:");
-          if (session.equals(leaderSession)) {
-            String type = getAttributeValue(inputLine, "type:");
-            if (type.equals("create")) {
-              stats.extViewCreateCount++;
-            } else if (type.equals("setData")) {
-              stats.extViewUpdateCount++;
-            }
-          }
-
-          // pos = inputLine.indexOf("EXTERNALVIEW");
-          // pos = inputLine.indexOf("data:{", pos);
-          // if (pos != -1)
-          // {
-          // String timestamp = getAttributeValue(inputLine, "time:");
-          // ZNRecord record =
-          // (ZNRecord) _deserializer.deserialize(inputLine.substring(pos + 5)
-          // .getBytes());
-          // ExternalView extView = new ExternalView(record);
-          // int masterCnt = ClusterStateVerifier.countStateNbInExtView(extView,
-          // "MASTER");
-          // int slaveCnt = ClusterStateVerifier.countStateNbInExtView(extView, "SLAVE");
-          // if (masterCnt == 1200)
-          // {
-          // System.out.println(timestamp + ": externalView " + extView.getResourceName()
-          // + " has " + masterCnt + " MASTER, " + slaveCnt + " SLAVE");
-          // }
-          // }
-        } else if (inputLine.indexOf("/" + clusterName + "/") != -1
-            && inputLine.indexOf("/MESSAGES/") != -1) {
-          String type = getAttributeValue(inputLine, "type:");
-
-          if (type.equals("create")) {
-            ZNRecord record = getZNRecord(inputLine);
-            Message msg = new Message(record);
-            String sendSession = getAttributeValue(inputLine, "session:");
-            if (sendSession.equals(leaderSession) && msg.getMsgType().equals("STATE_TRANSITION")
-                && msg.getMsgState() == MessageState.NEW) {
-              // sendMessageLines.add(inputLine);
-              stats.msgSentCount++;
-
-              if (msg.getFromState().equals("OFFLINE") && msg.getToState().equals("SLAVE")) {
-                stats.msgSentCount_O2S++;
-              } else if (msg.getFromState().equals("SLAVE") && msg.getToState().equals("MASTER")) {
-                stats.msgSentCount_S2M++;
-              } else if (msg.getFromState().equals("MASTER") && msg.getToState().equals("SLAVE")) {
-                stats.msgSentCount_M2S++;
-              }
-              // System.out.println("Message create:"+new
-              // Timestamp(Long.parseLong(timestamp)));
-            }
-
-            // pos = inputLine.indexOf("MESSAGES");
-            // pos = inputLine.indexOf("data:{", pos);
-            // if (pos != -1)
-            // {
-            //
-            // byte[] msgBytes = inputLine.substring(pos + 5).getBytes();
-            // ZNRecord record = (ZNRecord) _deserializer.deserialize(msgBytes);
-            // Message msg = new Message(record);
-            // MessageState msgState = msg.getMsgState();
-            // String msgType = msg.getMsgType();
-            // if (msgType.equals("STATE_TRANSITION") && msgState == MessageState.NEW)
-            // {
-            // if (!msgs.containsKey(msg.getMsgId()))
-            // {
-            // msgs.put(msg.getMsgId(), new MsgItem(Long.parseLong(timestamp), msg));
-            // }
-            // else
-            // {
-            // LOG.error("msg: " + msg.getMsgId() + " already sent");
-            // }
-            //
-            // System.out.println(timestamp + ": sendMsg " + msg.getPartitionName() + "("
-            // + msg.getFromState() + "->" + msg.getToState() + ") to "
-            // + msg.getTgtName() + ", size: " + msgBytes.length);
-            // }
-            // }
-          } else if (type.equals("setData")) {
-            stats.msgModifyCount++;
-            // pos = inputLine.indexOf("MESSAGES");
-            // pos = inputLine.indexOf("data:{", pos);
-            // if (pos != -1)
-            // {
-            //
-            // byte[] msgBytes = inputLine.substring(pos + 5).getBytes();
-            // ZNRecord record = (ZNRecord) _deserializer.deserialize(msgBytes);
-            // Message msg = new Message(record);
-            // MessageState msgState = msg.getMsgState();
-            // String msgType = msg.getMsgType();
-            // if (msgType.equals("STATE_TRANSITION") && msgState == MessageState.READ)
-            // {
-            // if (!msgs.containsKey(msg.getMsgId()))
-            // {
-            // LOG.error("msg: " + msg.getMsgId() + " never sent");
-            // }
-            // else
-            // {
-            // MsgItem msgItem = msgs.get(msg.getMsgId());
-            // if (msgItem.readTime == 0)
-            // {
-            // msgItem.readTime = Long.parseLong(timestamp);
-            // msgs.put(msg.getMsgId(), msgItem);
-            // // System.out.println(timestamp + ": readMsg " + msg.getPartitionName()
-            // // + "("
-            // // + msg.getFromState() + "->" + msg.getToState() + ") to "
-            // // + msg.getTgtName() + ", latency: " + (msgItem.readTime -
-            // // msgItem.sendTime));
-            // }
-            // }
-            //
-            // }
-            // }
-          } else if (type.equals("delete")) {
-            stats.msgDeleteCount++;
-            // String msgId = path.substring(path.lastIndexOf('/') + 1);
-            // if (msgs.containsKey(msgId))
-            // {
-            // MsgItem msgItem = msgs.get(msgId);
-            // Message msg = msgItem.msg;
-            // msgItem.deleteTime = Long.parseLong(timestamp);
-            // msgs.put(msgId, msgItem);
-            // msgItem.latency = msgItem.deleteTime - msgItem.sendTime;
-            // System.out.println(timestamp + ": delMsg " + msg.getPartitionName() + "("
-            // + msg.getFromState() + "->" + msg.getToState() + ") to "
-            // + msg.getTgtName() + ", latency: " + msgItem.latency);
-            // }
-            // else
-            // {
-            // // messages other than STATE_TRANSITION message
-            // // LOG.error("msg: " + msgId + " never sent");
-            // }
-          }
-        }
-      } // end of [br.readLine()) != null]
-    }
-  }
-}


[2/3] git commit: [HELIX-22] Remove dependency on josql, rb=16017

Posted by ka...@apache.org.
[HELIX-22] Remove dependency on josql, rb=16017


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/5019d96e
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/5019d96e
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/5019d96e

Branch: refs/heads/helix-0.6.2-release
Commit: 5019d96e233a905c5eb640f4a8b6d14af839e06d
Parents: efc1def
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Wed Dec 4 13:00:20 2013 -0800
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Thu Mar 13 18:55:19 2014 -0700

----------------------------------------------------------------------
 helix-core/pom.xml                              |   6 -
 .../main/java/org/apache/helix/Criteria.java    |  12 +-
 .../helix/josql/ClusterJosqlQueryProcessor.java | 278 -------------------
 .../josql/ZNRecordJosqlFunctionHandler.java     |  78 ------
 .../org/apache/helix/josql/ZNRecordRow.java     | 174 ------------
 .../org/apache/helix/josql/package-info.java    |  23 --
 .../helix/messaging/CriteriaEvaluator.java      | 156 +++++++----
 .../org/apache/helix/messaging/ZNRecordRow.java | 193 +++++++++++++
 .../josql/TestClusterJosqlQueryProcessor.java   | 102 -------
 .../apache/helix/josql/TestJosqlProcessor.java  | 224 ---------------
 .../messaging/TestDefaultMessagingService.java  |  39 ++-
 pom.xml                                         |   5 -
 12 files changed, 335 insertions(+), 955 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/5019d96e/helix-core/pom.xml
----------------------------------------------------------------------
diff --git a/helix-core/pom.xml b/helix-core/pom.xml
index cc6b86e..ee09bf9 100644
--- a/helix-core/pom.xml
+++ b/helix-core/pom.xml
@@ -45,7 +45,6 @@ under the License.
       org.apache.zookeeper.txn*;resolution:=optional,
       org.apache.zookeeper*;version="[3.3,4)",
       org.codehaus.jackson*;version="[1.8,2)",
-      org.josql*;version="[1.5,2)",
       org.restlet;version="[2.1.4,3]",
       *
     </osgi.import>
@@ -116,11 +115,6 @@ under the License.
       <version>0.1</version>
     </dependency>
     <dependency>
-      <groupId>org.apache.camel</groupId>
-      <artifactId>camel-josql</artifactId>
-      <version>2.12.1</version>
-    </dependency>
-    <dependency>
       <groupId>org.testng</groupId>
       <artifactId>testng</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/helix/blob/5019d96e/helix-core/src/main/java/org/apache/helix/Criteria.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/Criteria.java b/helix-core/src/main/java/org/apache/helix/Criteria.java
index da2b36e..75781e1 100644
--- a/helix-core/src/main/java/org/apache/helix/Criteria.java
+++ b/helix-core/src/main/java/org/apache/helix/Criteria.java
@@ -39,16 +39,16 @@ public class Criteria {
    */
   boolean sessionSpecific;
   /**
-   * applicable only in case PARTICIPANT use * to broadcast to all instances
+   * applicable only in case PARTICIPANT use % to broadcast to all instances
    */
   String instanceName = "";
   /**
-   * Name of the resource. Use * to send message to all resources
+   * Name of the resource. Use % to send message to all resources
    * owned by an instance.
    */
   String resourceName = "";
   /**
-   * Resource partition. Use * to send message to all partitions of a given
+   * Resource partition. Use % to send message to all partitions of a given
    * resource
    */
   String partitionName = "";
@@ -140,7 +140,7 @@ public class Criteria {
 
   /**
    * Set the name of the destination instance (PARTICIPANT only)
-   * @param instanceName the instance name or * for all instances
+   * @param instanceName the instance name or % for all instances
    */
   public void setInstanceName(String instanceName) {
     this.instanceName = instanceName;
@@ -156,7 +156,7 @@ public class Criteria {
 
   /**
    * Set the destination resource name
-   * @param resourceName the resource name or * for all resources
+   * @param resourceName the resource name or % for all resources
    */
   public void setResource(String resourceName) {
     this.resourceName = resourceName;
@@ -172,7 +172,7 @@ public class Criteria {
 
   /**
    * Set the destination partition name
-   * @param partitionName the partition name, or * for all partitions of a resource
+   * @param partitionName the partition name, or % for all partitions of a resource
    */
   public void setPartition(String partitionName) {
     this.partitionName = partitionName;

http://git-wip-us.apache.org/repos/asf/helix/blob/5019d96e/helix-core/src/main/java/org/apache/helix/josql/ClusterJosqlQueryProcessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/josql/ClusterJosqlQueryProcessor.java b/helix-core/src/main/java/org/apache/helix/josql/ClusterJosqlQueryProcessor.java
deleted file mode 100644
index 8f11de5..0000000
--- a/helix-core/src/main/java/org/apache/helix/josql/ClusterJosqlQueryProcessor.java
+++ /dev/null
@@ -1,278 +0,0 @@
-package org.apache.helix.josql;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixException;
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixProperty;
-import org.apache.helix.PropertyType;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
-import org.apache.helix.model.LiveInstance.LiveInstanceProperty;
-import org.apache.log4j.Logger;
-import org.josql.Query;
-import org.josql.QueryExecutionException;
-import org.josql.QueryParseException;
-import org.josql.QueryResults;
-
-public class ClusterJosqlQueryProcessor {
-  public static final String PARTITIONS = "PARTITIONS";
-  public static final String FLATTABLE = ".Table";
-
-  HelixManager _manager;
-  private static Logger _logger = Logger.getLogger(ClusterJosqlQueryProcessor.class);
-
-  public ClusterJosqlQueryProcessor(HelixManager manager) {
-    _manager = manager;
-  }
-
-  String parseFromTarget(String sql) {
-    // We need to find out the "FROM" target, and replace it with liveInstances
-    // / partitions etc
-    int fromIndex = sql.indexOf("FROM");
-    if (fromIndex == -1) {
-      throw new HelixException("Query must contain FROM target. Query: " + sql);
-    }
-    // Per JoSql, select FROM <target> the target must be a object class that
-    // corresponds to a "table row"
-    // In out case, the row is always a ZNRecord
-
-    int nextSpace = sql.indexOf(" ", fromIndex);
-    while (sql.charAt(nextSpace) == ' ') {
-      nextSpace++;
-    }
-    int nextnextSpace = sql.indexOf(" ", nextSpace);
-    if (nextnextSpace == -1) {
-      nextnextSpace = sql.length();
-    }
-    String fromTarget = sql.substring(nextSpace, nextnextSpace).trim();
-
-    if (fromTarget.length() == 0) {
-      throw new HelixException("FROM target in the query cannot be empty. Query: " + sql);
-    }
-    return fromTarget;
-  }
-
-  public List<Object> runJoSqlQuery(String josql, Map<String, Object> bindVariables,
-      List<Object> additionalFunctionHandlers, List queryTarget) throws QueryParseException,
-      QueryExecutionException {
-    Query josqlQuery = prepareQuery(bindVariables, additionalFunctionHandlers);
-
-    josqlQuery.parse(josql);
-    QueryResults qr = josqlQuery.execute(queryTarget);
-
-    return qr.getResults();
-  }
-
-  Query prepareQuery(Map<String, Object> bindVariables, List<Object> additionalFunctionHandlers) {
-    // DataAccessor accessor = _manager.getDataAccessor();
-    HelixDataAccessor accessor = _manager.getHelixDataAccessor();
-
-    // Get all the ZNRecords in the cluster and set them as bind variables
-    Builder keyBuilder = accessor.keyBuilder();
-    // List<ZNRecord> instanceConfigs = accessor.getChildValues(PropertyType.CONFIGS,
-    // ConfigScopeProperty.PARTICIPANT.toString());
-
-    List<ZNRecord> instanceConfigs =
-        HelixProperty.convertToList(accessor.getChildValues(keyBuilder.instanceConfigs()));
-
-    List<ZNRecord> liveInstances =
-        HelixProperty.convertToList(accessor.getChildValues(keyBuilder.liveInstances()));
-    List<ZNRecord> stateModelDefs =
-        HelixProperty.convertToList(accessor.getChildValues(keyBuilder.stateModelDefs()));
-
-    // Idealstates are stored in a map from resource name to idealState ZNRecord
-    List<ZNRecord> idealStateList =
-        HelixProperty.convertToList(accessor.getChildValues(keyBuilder.idealStates()));
-
-    Map<String, ZNRecord> idealStatesMap = new HashMap<String, ZNRecord>();
-    for (ZNRecord idealState : idealStateList) {
-      idealStatesMap.put(idealState.getId(), idealState);
-    }
-    // Make up the partition list: for selecting partitions
-    List<ZNRecord> partitions = new ArrayList<ZNRecord>();
-    for (ZNRecord idealState : idealStateList) {
-      for (String partitionName : idealState.getMapFields().keySet()) {
-        partitions.add(new ZNRecord(partitionName));
-      }
-    }
-
-    List<ZNRecord> externalViewList =
-        HelixProperty.convertToList(accessor.getChildValues(keyBuilder.externalViews()));
-    // ExternalViews are stored in a map from resource name to idealState
-    // ZNRecord
-    Map<String, ZNRecord> externalViewMap = new HashMap<String, ZNRecord>();
-    for (ZNRecord externalView : externalViewList) {
-      externalViewMap.put(externalView.getId(), externalView);
-    }
-    // Map from instance name to a map from resource to current state ZNRecord
-    Map<String, Map<String, ZNRecord>> currentStatesMap =
-        new HashMap<String, Map<String, ZNRecord>>();
-    // Map from instance name to a list of combined flat ZNRecordRow
-    Map<String, List<ZNRecordRow>> flatCurrentStateMap = new HashMap<String, List<ZNRecordRow>>();
-
-    for (ZNRecord instance : liveInstances) {
-      String host = instance.getId();
-      String sessionId = instance.getSimpleField(LiveInstanceProperty.SESSION_ID.toString());
-      Map<String, ZNRecord> currentStates = new HashMap<String, ZNRecord>();
-      List<ZNRecord> instanceCurrentStateList = new ArrayList<ZNRecord>();
-      for (ZNRecord idealState : idealStateList) {
-        String resourceName = idealState.getId();
-
-        HelixProperty property =
-            accessor.getProperty(keyBuilder.currentState(host, sessionId, resourceName));
-        ZNRecord currentState = null;
-        if (property == null) {
-          _logger.warn("Resource " + resourceName + " has null currentState");
-          currentState = new ZNRecord(resourceName);
-        } else {
-          currentState = property.getRecord();
-        }
-        currentStates.put(resourceName, currentState);
-        instanceCurrentStateList.add(currentState);
-      }
-      currentStatesMap.put(host, currentStates);
-      flatCurrentStateMap.put(host, ZNRecordRow.flatten(instanceCurrentStateList));
-    }
-    Query josqlQuery = new Query();
-
-    // Set the default bind variables
-    josqlQuery.setVariable(
-        PropertyType.CONFIGS.toString() + "/" + ConfigScopeProperty.PARTICIPANT.toString(),
-        instanceConfigs);
-    josqlQuery.setVariable(PropertyType.IDEALSTATES.toString(), idealStatesMap);
-    josqlQuery.setVariable(PropertyType.LIVEINSTANCES.toString(), liveInstances);
-    josqlQuery.setVariable(PropertyType.STATEMODELDEFS.toString(), stateModelDefs);
-    josqlQuery.setVariable(PropertyType.EXTERNALVIEW.toString(), externalViewMap);
-    josqlQuery.setVariable(PropertyType.CURRENTSTATES.toString(), currentStatesMap);
-    josqlQuery.setVariable(PARTITIONS, partitions);
-
-    // Flat version of ZNRecords
-    josqlQuery.setVariable(
-        PropertyType.CONFIGS.toString() + "/" + ConfigScopeProperty.PARTICIPANT.toString()
-            + FLATTABLE, ZNRecordRow.flatten(instanceConfigs));
-    josqlQuery.setVariable(PropertyType.IDEALSTATES.toString() + FLATTABLE,
-        ZNRecordRow.flatten(idealStateList));
-    josqlQuery.setVariable(PropertyType.LIVEINSTANCES.toString() + FLATTABLE,
-        ZNRecordRow.flatten(liveInstances));
-    josqlQuery.setVariable(PropertyType.STATEMODELDEFS.toString() + FLATTABLE,
-        ZNRecordRow.flatten(stateModelDefs));
-    josqlQuery.setVariable(PropertyType.EXTERNALVIEW.toString() + FLATTABLE,
-        ZNRecordRow.flatten(externalViewList));
-    josqlQuery.setVariable(PropertyType.CURRENTSTATES.toString() + FLATTABLE,
-        flatCurrentStateMap.values());
-    josqlQuery.setVariable(PARTITIONS + FLATTABLE, ZNRecordRow.flatten(partitions));
-    // Set additional bind variables
-    if (bindVariables != null) {
-      for (String key : bindVariables.keySet()) {
-        josqlQuery.setVariable(key, bindVariables.get(key));
-      }
-    }
-
-    josqlQuery.addFunctionHandler(new ZNRecordJosqlFunctionHandler());
-    josqlQuery.addFunctionHandler(new ZNRecordRow());
-    josqlQuery.addFunctionHandler(new Integer(0));
-    if (additionalFunctionHandlers != null) {
-      for (Object functionHandler : additionalFunctionHandlers) {
-        josqlQuery.addFunctionHandler(functionHandler);
-      }
-    }
-    return josqlQuery;
-  }
-
-  public List<Object> runJoSqlQuery(String josql, Map<String, Object> bindVariables,
-      List<Object> additionalFunctionHandlers) throws QueryParseException, QueryExecutionException {
-    Query josqlQuery = prepareQuery(bindVariables, additionalFunctionHandlers);
-
-    // Per JoSql, select FROM <target> the target must be a object class that
-    // corresponds to a "table row",
-    // while the table (list of Objects) are put in the query by
-    // query.execute(List<Object>). In the input,
-    // In out case, the row is always a ZNRecord. But in SQL, the from target is
-    // a "table name".
-
-    String fromTargetString = parseFromTarget(josql);
-
-    List fromTargetList = null;
-    Object fromTarget = null;
-    if (fromTargetString.equalsIgnoreCase(PARTITIONS)) {
-      fromTarget = josqlQuery.getVariable(PARTITIONS.toString());
-    } else if (fromTargetString.equalsIgnoreCase(PropertyType.LIVEINSTANCES.toString())) {
-      fromTarget = josqlQuery.getVariable(PropertyType.LIVEINSTANCES.toString());
-    } else if (fromTargetString.equalsIgnoreCase(PropertyType.CONFIGS.toString() + "/"
-        + ConfigScopeProperty.PARTICIPANT.toString())) {
-      fromTarget =
-          josqlQuery.getVariable(PropertyType.CONFIGS.toString() + "/"
-              + ConfigScopeProperty.PARTICIPANT.toString());
-    } else if (fromTargetString.equalsIgnoreCase(PropertyType.STATEMODELDEFS.toString())) {
-      fromTarget = josqlQuery.getVariable(PropertyType.STATEMODELDEFS.toString());
-    } else if (fromTargetString.equalsIgnoreCase(PropertyType.EXTERNALVIEW.toString())) {
-      fromTarget = josqlQuery.getVariable(PropertyType.EXTERNALVIEW.toString());
-    } else if (fromTargetString.equalsIgnoreCase(PropertyType.IDEALSTATES.toString())) {
-      fromTarget = josqlQuery.getVariable(PropertyType.IDEALSTATES.toString());
-    }
-
-    else if (fromTargetString.equalsIgnoreCase(PARTITIONS + FLATTABLE)) {
-      fromTarget = josqlQuery.getVariable(PARTITIONS.toString() + FLATTABLE);
-    } else if (fromTargetString.equalsIgnoreCase(PropertyType.LIVEINSTANCES.toString() + FLATTABLE)) {
-      fromTarget = josqlQuery.getVariable(PropertyType.LIVEINSTANCES.toString() + FLATTABLE);
-    } else if (fromTargetString.equalsIgnoreCase(PropertyType.CONFIGS.toString() + "/"
-        + ConfigScopeProperty.PARTICIPANT.toString() + FLATTABLE)) {
-      fromTarget =
-          josqlQuery.getVariable(PropertyType.CONFIGS.toString() + "/"
-              + ConfigScopeProperty.PARTICIPANT.toString() + FLATTABLE);
-    } else if (fromTargetString
-        .equalsIgnoreCase(PropertyType.STATEMODELDEFS.toString() + FLATTABLE)) {
-      fromTarget = josqlQuery.getVariable(PropertyType.STATEMODELDEFS.toString() + FLATTABLE);
-    } else if (fromTargetString.equalsIgnoreCase(PropertyType.EXTERNALVIEW.toString() + FLATTABLE)) {
-      fromTarget = josqlQuery.getVariable(PropertyType.EXTERNALVIEW.toString() + FLATTABLE);
-    } else if (fromTargetString.equalsIgnoreCase(PropertyType.IDEALSTATES.toString() + FLATTABLE)) {
-      fromTarget = josqlQuery.getVariable(PropertyType.IDEALSTATES.toString() + FLATTABLE);
-    } else {
-      throw new HelixException(
-          "Unknown query target "
-              + fromTargetString
-              + ". Target should be PARTITIONS, LIVEINSTANCES, CONFIGS, STATEMODELDEFS, IDEALSTATES, EXTERNALVIEW, and corresponding flat Tables");
-    }
-
-    fromTargetList =
-        fromTargetString.endsWith(FLATTABLE) ? ((List<ZNRecordRow>) fromTarget)
-            : ((List<ZNRecord>) fromTarget);
-
-    // Per JoSql, select FROM <target> the target must be a object class that
-    // corresponds to a "table row"
-    // In out case, the row is always a ZNRecord
-    josql =
-        josql.replaceFirst(
-            fromTargetString,
-            fromTargetString.endsWith(FLATTABLE) ? ZNRecordRow.class.getName() : ZNRecord.class
-                .getName());
-    josqlQuery.parse(josql);
-    QueryResults qr = josqlQuery.execute(fromTargetList);
-    return qr.getResults();
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/5019d96e/helix-core/src/main/java/org/apache/helix/josql/ZNRecordJosqlFunctionHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/josql/ZNRecordJosqlFunctionHandler.java b/helix-core/src/main/java/org/apache/helix/josql/ZNRecordJosqlFunctionHandler.java
deleted file mode 100644
index f2fbddb..0000000
--- a/helix-core/src/main/java/org/apache/helix/josql/ZNRecordJosqlFunctionHandler.java
+++ /dev/null
@@ -1,78 +0,0 @@
-package org.apache.helix.josql;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Map;
-
-import org.apache.helix.ZNRecord;
-import org.josql.functions.AbstractFunctionHandler;
-
-public class ZNRecordJosqlFunctionHandler extends AbstractFunctionHandler {
-  public boolean hasSimpleField(ZNRecord record, String fieldName, String field) {
-    if (!record.getSimpleFields().containsKey(fieldName)) {
-      return false;
-    }
-    return field.equals(record.getSimpleField(fieldName));
-  }
-
-  public boolean hasListField(ZNRecord record, String fieldName, String field) {
-    if (!record.getListFields().containsKey(fieldName)) {
-      return false;
-    }
-    return record.getListField(fieldName).contains(field);
-  }
-
-  public boolean hasMapFieldValue(ZNRecord record, String fieldName, String mapKey, String mapValue) {
-    if (!record.getMapFields().containsKey(fieldName)) {
-      return false;
-    }
-    if (record.getMapField(fieldName).containsKey(mapKey)) {
-      return record.getMapField(fieldName).get(mapKey).equals(mapValue);
-    }
-    return false;
-  }
-
-  public boolean hasMapFieldKey(ZNRecord record, String fieldName, String mapKey) {
-    if (!record.getMapFields().containsKey(fieldName)) {
-      return false;
-    }
-    return record.getMapField(fieldName).containsKey(mapKey);
-  }
-
-  public String getMapFieldValue(ZNRecord record, String fieldName, String mapKey) {
-    if (record.getMapFields().containsKey(fieldName)) {
-      return record.getMapField(fieldName).get(mapKey);
-    }
-    return null;
-  }
-
-  public String getSimpleFieldValue(ZNRecord record, String key) {
-    return record.getSimpleField(key);
-  }
-
-  public ZNRecord getZNRecordFromMap(Map<String, ZNRecord> recordMap, String key) {
-    return recordMap.get(key);
-  }
-
-  public ZNRecord getZNRecordFromMap(Map<String, Map<String, ZNRecord>> recordMap, String key,
-      String subKey) {
-    return recordMap.get(key).get(subKey);
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/5019d96e/helix-core/src/main/java/org/apache/helix/josql/ZNRecordRow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/josql/ZNRecordRow.java b/helix-core/src/main/java/org/apache/helix/josql/ZNRecordRow.java
deleted file mode 100644
index 108596f..0000000
--- a/helix-core/src/main/java/org/apache/helix/josql/ZNRecordRow.java
+++ /dev/null
@@ -1,174 +0,0 @@
-package org.apache.helix.josql;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.helix.ZNRecord;
-
-/**
- * A Normalized form of ZNRecord
- */
-public class ZNRecordRow {
-  // "Field names" in the flattened ZNRecord
-  public static final String SIMPLE_KEY = "simpleKey";
-  public static final String SIMPLE_VALUE = "simpleValue";
-
-  public static final String LIST_KEY = "listKey";
-  public static final String LIST_VALUE = "listValue";
-  public static final String LIST_VALUE_INDEX = "listValueIndex";
-
-  public static final String MAP_KEY = "mapKey";
-  public static final String MAP_SUBKEY = "mapSubKey";
-  public static final String MAP_VALUE = "mapValue";
-  public static final String ZNRECORD_ID = "recordId";
-  // ZNRECORD path ?
-
-  final Map<String, String> _rowDataMap = new HashMap<String, String>();
-
-  public ZNRecordRow() {
-    _rowDataMap.put(SIMPLE_KEY, "");
-    _rowDataMap.put(SIMPLE_VALUE, "");
-    _rowDataMap.put(LIST_KEY, "");
-    _rowDataMap.put(LIST_VALUE, "");
-    _rowDataMap.put(LIST_VALUE_INDEX, "");
-    _rowDataMap.put(MAP_KEY, "");
-    _rowDataMap.put(MAP_SUBKEY, "");
-    _rowDataMap.put(MAP_VALUE, "");
-    _rowDataMap.put(ZNRECORD_ID, "");
-  }
-
-  public String getField(String rowField) {
-    return _rowDataMap.get(rowField);
-  }
-
-  public void putField(String fieldName, String fieldValue) {
-    _rowDataMap.put(fieldName, fieldValue);
-  }
-
-  public String getListValueIndex() {
-    return getField(LIST_VALUE_INDEX);
-  }
-
-  public String getSimpleKey() {
-    return getField(SIMPLE_KEY);
-  }
-
-  public String getSimpleValue() {
-    return getField(SIMPLE_VALUE);
-  }
-
-  public String getListKey() {
-    return getField(LIST_KEY);
-  }
-
-  public String getListValue() {
-    return getField(LIST_VALUE);
-  }
-
-  public String getMapKey() {
-    return getField(MAP_KEY);
-  }
-
-  public String getMapSubKey() {
-    return getField(MAP_SUBKEY);
-  }
-
-  public String getMapValue() {
-    return getField(MAP_VALUE);
-  }
-
-  public String getRecordId() {
-    return getField(ZNRECORD_ID);
-  }
-
-  /* Josql function handlers */
-  public static String getField(ZNRecordRow row, String rowField) {
-    return row.getField(rowField);
-  }
-
-  public static List<ZNRecordRow> convertSimpleFields(ZNRecord record) {
-    List<ZNRecordRow> result = new ArrayList<ZNRecordRow>();
-    for (String key : record.getSimpleFields().keySet()) {
-      ZNRecordRow row = new ZNRecordRow();
-      row.putField(ZNRECORD_ID, record.getId());
-      row.putField(SIMPLE_KEY, key);
-      row.putField(SIMPLE_VALUE, record.getSimpleField(key));
-      result.add(row);
-    }
-    return result;
-  }
-
-  public static List<ZNRecordRow> convertListFields(ZNRecord record) {
-    List<ZNRecordRow> result = new ArrayList<ZNRecordRow>();
-    for (String key : record.getListFields().keySet()) {
-      int order = 0;
-      for (String value : record.getListField(key)) {
-        ZNRecordRow row = new ZNRecordRow();
-        row.putField(ZNRECORD_ID, record.getId());
-        row.putField(LIST_KEY, key);
-        row.putField(LIST_VALUE, record.getSimpleField(key));
-        row.putField(LIST_VALUE_INDEX, "" + order);
-        order++;
-        result.add(row);
-      }
-    }
-    return result;
-  }
-
-  public static List<ZNRecordRow> convertMapFields(ZNRecord record) {
-    List<ZNRecordRow> result = new ArrayList<ZNRecordRow>();
-    for (String key0 : record.getMapFields().keySet()) {
-      for (String key1 : record.getMapField(key0).keySet()) {
-        ZNRecordRow row = new ZNRecordRow();
-        row.putField(ZNRECORD_ID, record.getId());
-        row.putField(MAP_KEY, key0);
-        row.putField(MAP_SUBKEY, key1);
-        row.putField(MAP_VALUE, record.getMapField(key0).get(key1));
-        result.add(row);
-      }
-    }
-    return result;
-  }
-
-  public static List<ZNRecordRow> flatten(ZNRecord record) {
-    List<ZNRecordRow> result = convertMapFields(record);
-    result.addAll(convertListFields(record));
-    result.addAll(convertSimpleFields(record));
-    return result;
-  }
-
-  public static List<ZNRecordRow> flatten(Collection<ZNRecord> recordList) {
-    List<ZNRecordRow> result = new ArrayList<ZNRecordRow>();
-    for (ZNRecord record : recordList) {
-      result.addAll(flatten(record));
-    }
-    return result;
-  }
-
-  public static List<ZNRecordRow> getRowListFromMap(Map<String, List<ZNRecordRow>> rowMap,
-      String key) {
-    return rowMap.get(key);
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/5019d96e/helix-core/src/main/java/org/apache/helix/josql/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/josql/package-info.java b/helix-core/src/main/java/org/apache/helix/josql/package-info.java
deleted file mode 100644
index 550d569..0000000
--- a/helix-core/src/main/java/org/apache/helix/josql/package-info.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-/**
- * Jsql processor for Helix
- * 
- */
-package org.apache.helix.josql;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/helix/blob/5019d96e/helix-core/src/main/java/org/apache/helix/messaging/CriteriaEvaluator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/CriteriaEvaluator.java b/helix-core/src/main/java/org/apache/helix/messaging/CriteriaEvaluator.java
index 3d2569e..9ca20af 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/CriteriaEvaluator.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/CriteriaEvaluator.java
@@ -19,80 +19,124 @@ package org.apache.helix.messaging;
  * under the License.
  */
 
-import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
 
 import org.apache.helix.Criteria;
+import org.apache.helix.Criteria.DataSource;
+import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
-import org.apache.helix.josql.ClusterJosqlQueryProcessor;
-import org.apache.helix.josql.ZNRecordRow;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.PropertyKey;
 import org.apache.log4j.Logger;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
 public class CriteriaEvaluator {
   private static Logger logger = Logger.getLogger(CriteriaEvaluator.class);
 
+  /**
+   * Examine persisted data to match wildcards in {@link Criteria}
+   * @param recipientCriteria Criteria specifying the message destinations
+   * @param manager connection to the persisted data
+   * @return map of evaluated criteria
+   */
   public List<Map<String, String>> evaluateCriteria(Criteria recipientCriteria, HelixManager manager) {
-    List<Map<String, String>> selected = new ArrayList<Map<String, String>>();
-
-    String queryFields =
-        (!recipientCriteria.getInstanceName().equals("") ? " " + ZNRecordRow.MAP_SUBKEY : " ''")
-            + ","
-            + (!recipientCriteria.getResource().equals("") ? " " + ZNRecordRow.ZNRECORD_ID : " ''")
-            + ","
-            + (!recipientCriteria.getPartition().equals("") ? " " + ZNRecordRow.MAP_KEY : " ''")
-            + ","
-            + (!recipientCriteria.getPartitionState().equals("") ? " " + ZNRecordRow.MAP_VALUE
-                : " '' ");
-
-    String matchCondition =
-        ZNRecordRow.MAP_SUBKEY
-            + " LIKE '"
-            + (!recipientCriteria.getInstanceName().equals("") ? (recipientCriteria
-                .getInstanceName() + "'") : "%' ")
-            + " AND "
-            + ZNRecordRow.ZNRECORD_ID
-            + " LIKE '"
-            + (!recipientCriteria.getResource().equals("") ? (recipientCriteria.getResource() + "'")
-                : "%' ")
-            + " AND "
-            + ZNRecordRow.MAP_KEY
-            + " LIKE '"
-            + (!recipientCriteria.getPartition().equals("") ? (recipientCriteria.getPartition() + "'")
-                : "%' ")
-            + " AND "
-            + ZNRecordRow.MAP_VALUE
-            + " LIKE '"
-            + (!recipientCriteria.getPartitionState().equals("") ? (recipientCriteria
-                .getPartitionState() + "'") : "%' ") + " AND " + ZNRecordRow.MAP_SUBKEY
-            + " IN ((SELECT [*]id FROM :LIVEINSTANCES))";
+    // get the data
+    HelixDataAccessor accessor = manager.getHelixDataAccessor();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    Set<Map<String, String>> selected = Sets.newHashSet();
+    List<HelixProperty> properties;
+    if (recipientCriteria.getDataSource() == DataSource.EXTERNALVIEW) {
+      properties = accessor.getChildValues(keyBuilder.externalViews());
+    } else if (recipientCriteria.getDataSource() == DataSource.IDEALSTATES) {
+      properties = accessor.getChildValues(keyBuilder.idealStates());
+    } else {
+      return Collections.emptyList();
+    }
 
-    String queryTarget =
-        recipientCriteria.getDataSource().toString() + ClusterJosqlQueryProcessor.FLATTABLE;
+    // flatten the data
+    List<ZNRecordRow> allRows = ZNRecordRow.flatten(HelixProperty.convertToList(properties));
 
-    String josql =
-        "SELECT DISTINCT " + queryFields + " FROM " + queryTarget + " WHERE " + matchCondition;
-    ClusterJosqlQueryProcessor p = new ClusterJosqlQueryProcessor(manager);
-    List<Object> result = new ArrayList<Object>();
-    try {
-      logger.info("JOSQL query: " + josql);
-      result = p.runJoSqlQuery(josql, null, null);
-    } catch (Exception e) {
-      logger.error("", e);
-      return selected;
+    // save the matches
+    Set<String> liveParticipants = accessor.getChildValuesMap(keyBuilder.liveInstances()).keySet();
+    List<ZNRecordRow> result = Lists.newArrayList();
+    for (ZNRecordRow row : allRows) {
+      if (rowMatches(recipientCriteria, row) && liveParticipants.contains(row.getMapSubKey())) {
+        result.add(row);
+      }
     }
 
-    for (Object o : result) {
+    // deduplicate and convert the matches into the required format
+    for (ZNRecordRow row : result) {
       Map<String, String> resultRow = new HashMap<String, String>();
-      List<Object> row = (List<Object>) o;
-      resultRow.put("instanceName", (String) (row.get(0)));
-      resultRow.put("resourceName", (String) (row.get(1)));
-      resultRow.put("partitionName", (String) (row.get(2)));
-      resultRow.put("partitionState", (String) (row.get(3)));
+      resultRow.put("instanceName",
+          !recipientCriteria.getInstanceName().equals("") ? row.getMapSubKey() : "");
+      resultRow.put("resourceName", !recipientCriteria.getResource().equals("") ? row.getRecordId()
+          : "");
+      resultRow.put("partitionName", !recipientCriteria.getPartition().equals("") ? row.getMapKey()
+          : "");
+      resultRow.put("partitionState",
+          !recipientCriteria.getPartitionState().equals("") ? row.getMapValue() : "");
       selected.add(resultRow);
     }
-    logger.info("JOSQL query return " + selected.size() + " rows");
-    return selected;
+    logger.info("Query returned " + selected.size() + " rows");
+    return Lists.newArrayList(selected);
+  }
+
+  /**
+   * Check if a given row matches the specified criteria
+   * @param criteria the criteria
+   * @param row row of currently persisted data
+   * @return true if it matches, false otherwise
+   */
+  private boolean rowMatches(Criteria criteria, ZNRecordRow row) {
+    String instanceName = normalizePattern(criteria.getInstanceName());
+    String resourceName = normalizePattern(criteria.getResource());
+    String partitionName = normalizePattern(criteria.getPartition());
+    String partitionState = normalizePattern(criteria.getPartitionState());
+    return stringMatches(instanceName, row.getMapSubKey())
+        && stringMatches(resourceName, row.getRecordId())
+        && stringMatches(partitionName, row.getMapKey())
+        && stringMatches(partitionState, row.getMapValue());
+  }
+
+  /**
+   * Convert an SQL like expression into a Java matches expression
+   * @param pattern SQL like match pattern (i.e. contains '%'s and '_'s)
+   * @return Java matches expression (i.e. contains ".*?"s and '.'s)
+   */
+  private String normalizePattern(String pattern) {
+    if (pattern == null || pattern.equals("") || pattern.equals("*")) {
+      pattern = "%";
+    }
+    StringBuilder builder = new StringBuilder();
+    for (int i = 0; i < pattern.length(); i++) {
+      char ch = pattern.charAt(i);
+      if ("[](){}.*+?$^|#\\".indexOf(ch) != -1) {
+        // escape any reserved characters
+        builder.append("\\");
+      }
+      // append the character
+      builder.append(ch);
+    }
+    pattern = builder.toString().toLowerCase().replace("_", ".").replace("%", ".*?");
+    return pattern;
+  }
+
+  /**
+   * Check if a string matches a pattern
+   * @param pattern pattern allowed by Java regex matching
+   * @param value the string to check
+   * @return true if they match, false otherwise
+   */
+  private boolean stringMatches(String pattern, String value) {
+    Pattern p = Pattern.compile(pattern, Pattern.CASE_INSENSITIVE | Pattern.DOTALL);
+    return p.matcher(value).matches();
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/5019d96e/helix-core/src/main/java/org/apache/helix/messaging/ZNRecordRow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/ZNRecordRow.java b/helix-core/src/main/java/org/apache/helix/messaging/ZNRecordRow.java
new file mode 100644
index 0000000..5a2effd
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/messaging/ZNRecordRow.java
@@ -0,0 +1,193 @@
+package org.apache.helix.messaging;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.ZNRecord;
+
+/**
+ * A Normalized form of ZNRecord
+ */
+public class ZNRecordRow {
+  // "Field names" in the flattened ZNRecord
+  public static final String SIMPLE_KEY = "simpleKey";
+  public static final String SIMPLE_VALUE = "simpleValue";
+
+  public static final String LIST_KEY = "listKey";
+  public static final String LIST_VALUE = "listValue";
+  public static final String LIST_VALUE_INDEX = "listValueIndex";
+
+  public static final String MAP_KEY = "mapKey";
+  public static final String MAP_SUBKEY = "mapSubKey";
+  public static final String MAP_VALUE = "mapValue";
+  public static final String ZNRECORD_ID = "recordId";
+  // ZNRECORD path ?
+
+  final Map<String, String> _rowDataMap = new HashMap<String, String>();
+
+  public ZNRecordRow() {
+    _rowDataMap.put(SIMPLE_KEY, "");
+    _rowDataMap.put(SIMPLE_VALUE, "");
+    _rowDataMap.put(LIST_KEY, "");
+    _rowDataMap.put(LIST_VALUE, "");
+    _rowDataMap.put(LIST_VALUE_INDEX, "");
+    _rowDataMap.put(MAP_KEY, "");
+    _rowDataMap.put(MAP_SUBKEY, "");
+    _rowDataMap.put(MAP_VALUE, "");
+    _rowDataMap.put(ZNRECORD_ID, "");
+  }
+
+  public String getField(String rowField) {
+    return _rowDataMap.get(rowField);
+  }
+
+  public void putField(String fieldName, String fieldValue) {
+    _rowDataMap.put(fieldName, fieldValue);
+  }
+
+  public String getListValueIndex() {
+    return getField(LIST_VALUE_INDEX);
+  }
+
+  public String getSimpleKey() {
+    return getField(SIMPLE_KEY);
+  }
+
+  public String getSimpleValue() {
+    return getField(SIMPLE_VALUE);
+  }
+
+  public String getListKey() {
+    return getField(LIST_KEY);
+  }
+
+  public String getListValue() {
+    return getField(LIST_VALUE);
+  }
+
+  public String getMapKey() {
+    return getField(MAP_KEY);
+  }
+
+  public String getMapSubKey() {
+    return getField(MAP_SUBKEY);
+  }
+
+  public String getMapValue() {
+    return getField(MAP_VALUE);
+  }
+
+  public String getRecordId() {
+    return getField(ZNRECORD_ID);
+  }
+
+  /* Josql function handlers */
+  public static String getField(ZNRecordRow row, String rowField) {
+    return row.getField(rowField);
+  }
+
+  public static List<ZNRecordRow> convertSimpleFields(ZNRecord record) {
+    List<ZNRecordRow> result = new ArrayList<ZNRecordRow>();
+    for (String key : record.getSimpleFields().keySet()) {
+      ZNRecordRow row = new ZNRecordRow();
+      row.putField(ZNRECORD_ID, record.getId());
+      row.putField(SIMPLE_KEY, key);
+      row.putField(SIMPLE_VALUE, record.getSimpleField(key));
+      result.add(row);
+    }
+    return result;
+  }
+
+  public static List<ZNRecordRow> convertListFields(ZNRecord record) {
+    List<ZNRecordRow> result = new ArrayList<ZNRecordRow>();
+    for (String key : record.getListFields().keySet()) {
+      int order = 0;
+      for (String value : record.getListField(key)) {
+        ZNRecordRow row = new ZNRecordRow();
+        row.putField(ZNRECORD_ID, record.getId());
+        row.putField(LIST_KEY, key);
+        row.putField(LIST_VALUE, record.getSimpleField(key));
+        row.putField(LIST_VALUE_INDEX, "" + order);
+        order++;
+        result.add(row);
+      }
+    }
+    return result;
+  }
+
+  public static List<ZNRecordRow> convertMapFields(ZNRecord record) {
+    List<ZNRecordRow> result = new ArrayList<ZNRecordRow>();
+    for (String key0 : record.getMapFields().keySet()) {
+      for (String key1 : record.getMapField(key0).keySet()) {
+        ZNRecordRow row = new ZNRecordRow();
+        row.putField(ZNRECORD_ID, record.getId());
+        row.putField(MAP_KEY, key0);
+        row.putField(MAP_SUBKEY, key1);
+        row.putField(MAP_VALUE, record.getMapField(key0).get(key1));
+        result.add(row);
+      }
+    }
+    return result;
+  }
+
+  public static List<ZNRecordRow> flatten(ZNRecord record) {
+    List<ZNRecordRow> result = convertMapFields(record);
+    result.addAll(convertListFields(record));
+    result.addAll(convertSimpleFields(record));
+    return result;
+  }
+
+  public static List<ZNRecordRow> flatten(Collection<ZNRecord> recordList) {
+    List<ZNRecordRow> result = new ArrayList<ZNRecordRow>();
+    for (ZNRecord record : recordList) {
+      result.addAll(flatten(record));
+    }
+    return result;
+  }
+
+  public static List<ZNRecordRow> getRowListFromMap(Map<String, List<ZNRecordRow>> rowMap,
+      String key) {
+    return rowMap.get(key);
+  }
+
+  @Override
+  public String toString() {
+    return _rowDataMap.toString();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other instanceof ZNRecordRow) {
+      ZNRecordRow that = (ZNRecordRow) other;
+      return this._rowDataMap.equals(that._rowDataMap);
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return _rowDataMap.hashCode();
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/5019d96e/helix-core/src/test/java/org/apache/helix/josql/TestClusterJosqlQueryProcessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/josql/TestClusterJosqlQueryProcessor.java b/helix-core/src/test/java/org/apache/helix/josql/TestClusterJosqlQueryProcessor.java
deleted file mode 100644
index 8090201..0000000
--- a/helix-core/src/test/java/org/apache/helix/josql/TestClusterJosqlQueryProcessor.java
+++ /dev/null
@@ -1,102 +0,0 @@
-package org.apache.helix.josql;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-import org.apache.helix.Criteria;
-import org.apache.helix.InstanceType;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.josql.ZNRecordJosqlFunctionHandler;
-import org.apache.helix.josql.ZNRecordRow;
-import org.apache.helix.model.LiveInstance.LiveInstanceProperty;
-import org.apache.helix.tools.DefaultIdealStateCalculator;
-import org.josql.Query;
-import org.josql.QueryExecutionException;
-import org.josql.QueryParseException;
-import org.josql.QueryResults;
-import org.testng.annotations.Test;
-
-public class TestClusterJosqlQueryProcessor {
-  @Test(groups = {
-    "unitTest"
-  })
-  public void queryClusterDataSample() {
-    List<ZNRecord> liveInstances = new ArrayList<ZNRecord>();
-    Map<String, ZNRecord> liveInstanceMap = new HashMap<String, ZNRecord>();
-    List<String> instances = new ArrayList<String>();
-    for (int i = 0; i < 5; i++) {
-      String instance = "localhost_" + (12918 + i);
-      instances.add(instance);
-      ZNRecord metaData = new ZNRecord(instance);
-      metaData.setSimpleField(LiveInstanceProperty.SESSION_ID.toString(), UUID.randomUUID()
-          .toString());
-      metaData.setSimpleField("SCN", "" + (10 - i));
-      liveInstances.add(metaData);
-      liveInstanceMap.put(instance, metaData);
-    }
-
-    // liveInstances.remove(0);
-    ZNRecord externalView =
-        DefaultIdealStateCalculator.calculateIdealState(instances, 21, 3, "TestDB", "MASTER",
-            "SLAVE");
-
-    Criteria criteria = new Criteria();
-    criteria.setInstanceName("%");
-    criteria.setResource("TestDB");
-    criteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
-    criteria.setPartition("TestDB_2%");
-    criteria.setPartitionState("SLAVE");
-
-    String josql =
-        " SELECT DISTINCT mapSubKey AS 'subkey', mapValue AS 'mapValue' , getSimpleFieldValue(getZNRecordFromMap(:LIVEINSTANCESMAP, mapSubKey), 'SCN') AS 'SCN'"
-            + " FROM org.apache.helix.josql.ZNRecordRow "
-            + " WHERE mapKey LIKE 'TestDB_2%' "
-            + " AND mapSubKey LIKE '%' "
-            + " AND mapValue LIKE 'SLAVE' "
-            + " AND mapSubKey IN ((SELECT [*]id FROM :LIVEINSTANCES)) "
-            + " ORDER BY parseInt(getSimpleFieldValue(getZNRecordFromMap(:LIVEINSTANCESMAP, mapSubKey), 'SCN'))";
-
-    Query josqlQuery = new Query();
-    josqlQuery.setVariable("LIVEINSTANCES", liveInstances);
-    josqlQuery.setVariable("LIVEINSTANCESMAP", liveInstanceMap);
-    josqlQuery.addFunctionHandler(new ZNRecordRow());
-    josqlQuery.addFunctionHandler(new ZNRecordJosqlFunctionHandler());
-    josqlQuery.addFunctionHandler(new Integer(0));
-    try {
-      josqlQuery.parse(josql);
-      QueryResults qr = josqlQuery.execute(ZNRecordRow.convertMapFields(externalView));
-      @SuppressWarnings({
-          "unchecked", "unused"
-      })
-      List<Object> result = qr.getResults();
-
-    } catch (QueryParseException e) {
-      e.printStackTrace();
-    } catch (QueryExecutionException e) {
-      e.printStackTrace();
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/5019d96e/helix-core/src/test/java/org/apache/helix/josql/TestJosqlProcessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/josql/TestJosqlProcessor.java b/helix-core/src/test/java/org/apache/helix/josql/TestJosqlProcessor.java
deleted file mode 100644
index c70f984..0000000
--- a/helix-core/src/test/java/org/apache/helix/josql/TestJosqlProcessor.java
+++ /dev/null
@@ -1,224 +0,0 @@
-package org.apache.helix.josql;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-
-import org.apache.helix.HelixException;
-import org.apache.helix.HelixManager;
-import org.apache.helix.TestHelper;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.integration.ZkStandAloneCMTestBase;
-import org.apache.helix.josql.ClusterJosqlQueryProcessor;
-import org.apache.helix.model.LiveInstance.LiveInstanceProperty;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-public class TestJosqlProcessor extends ZkStandAloneCMTestBase {
-  @Test(groups = {
-    "integrationTest"
-  })
-  public void testJosqlQuery() throws Exception {
-    HelixManager manager = _participants[0];
-    // ((TestHelper.StartCMResult) (_startCMResultMap.values().toArray()[0]))._manager;
-
-    // Find the instance name that contains partition TestDB_2 and state is 'MASTER'
-    String SQL =
-        "SELECT id  "
-            + "FROM LIVEINSTANCES "
-            + "WHERE getMapFieldValue( getZNRecordFromMap(:IDEALSTATES , 'TestDB'), :partitionName, :_currObj.id)='MASTER'";
-    Map<String, Object> bindVariables = new HashMap<String, Object>();
-    bindVariables.put("partitionName", "TestDB_2");
-
-    ClusterJosqlQueryProcessor p = new ClusterJosqlQueryProcessor(manager);
-    List<Object> result = p.runJoSqlQuery(SQL, bindVariables, null);
-
-    Assert.assertEquals(result.size(), 1);
-    List<Object> firstList = (List<Object>) result.get(0);
-    Assert.assertTrue(((String) (firstList.get(0))).equalsIgnoreCase("localhost_12921"));
-
-    // Find the live instances names that hosts Partition TestDB_10 according to idealstate
-
-    SQL =
-        "SELECT id  "
-            + "FROM LIVEINSTANCES "
-            + "WHERE hasMapFieldKey( getZNRecordFromMap(:IDEALSTATES, 'TestDB'), :partitionName, :_currObj.id)='true'";
-    p = new ClusterJosqlQueryProcessor(manager);
-    bindVariables.put("partitionName", "TestDB_10");
-    result = p.runJoSqlQuery(SQL, bindVariables, null);
-
-    Assert.assertEquals(result.size(), 3);
-    Set<String> hosts = new HashSet<String>();
-    for (Object o : result) {
-      String val = (String) ((List<Object>) o).get(0);
-      hosts.add(val);
-    }
-    Assert.assertTrue(hosts.contains("localhost_12918"));
-    Assert.assertTrue(hosts.contains("localhost_12920"));
-    Assert.assertTrue(hosts.contains("localhost_12921"));
-
-    // Find the partitions on host localhost_12919 and is on MASTER state
-    SQL =
-        "SELECT id  "
-            + "FROM PARTITIONS "
-            + "WHERE getMapFieldValue( getZNRecordFromMap(:EXTERNALVIEW, 'TestDB'), id, :instanceName)='MASTER'";
-    p = new ClusterJosqlQueryProcessor(manager);
-    bindVariables.clear();
-    bindVariables.put("instanceName", "localhost_12919");
-    result = p.runJoSqlQuery(SQL, bindVariables, null);
-
-    Assert.assertEquals(result.size(), 4);
-    Set<String> partitions = new HashSet<String>();
-    for (Object o : result) {
-      String val = (String) ((List<Object>) o).get(0);
-      partitions.add(val);
-    }
-    Assert.assertTrue(partitions.contains("TestDB_6"));
-    Assert.assertTrue(partitions.contains("TestDB_7"));
-    Assert.assertTrue(partitions.contains("TestDB_9"));
-    Assert.assertTrue(partitions.contains("TestDB_14"));
-
-    // Find the partitions on host localhost_12919 and is on MASTER state
-    // Same as above but according to currentstates
-    SQL =
-        "SELECT id  "
-            + "FROM PARTITIONS "
-            + "WHERE getMapFieldValue( getZNRecordFromMap(:CURRENTSTATES, :instanceName, 'TestDB'), :_currObj.id, :mapFieldKey)=:partitionState";
-
-    p = new ClusterJosqlQueryProcessor(manager);
-    bindVariables.clear();
-    bindVariables.put("instanceName", "localhost_12919");
-    bindVariables.put("mapFieldKey", "CURRENT_STATE");
-    bindVariables.put("partitionState", "MASTER");
-
-    result = p.runJoSqlQuery(SQL, bindVariables, null);
-
-    Assert.assertEquals(result.size(), 4);
-    partitions.clear();
-    partitions = new HashSet<String>();
-    for (Object o : result) {
-      String val = (String) ((List<Object>) o).get(0);
-      partitions.add(val);
-    }
-    Assert.assertTrue(partitions.contains("TestDB_6"));
-    Assert.assertTrue(partitions.contains("TestDB_7"));
-    Assert.assertTrue(partitions.contains("TestDB_9"));
-    Assert.assertTrue(partitions.contains("TestDB_14"));
-
-    // get node name that hosts a certain partition with certain state
-
-    SQL =
-        "SELECT id  "
-            + "FROM LIVEINSTANCES "
-            + "WHERE getMapFieldValue( getZNRecordFromMap(:CURRENTSTATES, id, 'TestDB'), :partitionName, :mapFieldKey)=:partitionState";
-
-    p = new ClusterJosqlQueryProcessor(manager);
-    bindVariables.clear();
-    bindVariables.put("partitionName", "TestDB_8");
-    bindVariables.put("mapFieldKey", "CURRENT_STATE");
-    bindVariables.put("partitionState", "SLAVE");
-
-    result = p.runJoSqlQuery(SQL, bindVariables, null);
-
-    Assert.assertEquals(result.size(), 2);
-    partitions.clear();
-    partitions = new HashSet<String>();
-    for (Object o : result) {
-      String val = (String) ((List<Object>) o).get(0);
-      partitions.add(val);
-    }
-    Assert.assertTrue(partitions.contains("localhost_12918"));
-    Assert.assertTrue(partitions.contains("localhost_12922"));
-  }
-
-  @Test(groups = {
-    "unitTest"
-  })
-  public void parseFromTarget() {
-    ClusterJosqlQueryProcessor p = new ClusterJosqlQueryProcessor(null);
-    String sql = "SELECT id  " + "FROM LIVEINSTANCES ";
-    String from = p.parseFromTarget(sql);
-    Assert.assertTrue(from.equals("LIVEINSTANCES"));
-
-    sql = "SELECT id      " + "FROM    LIVEINSTANCES  WHERE 1=2";
-
-    from = p.parseFromTarget(sql);
-    Assert.assertTrue(from.equals("LIVEINSTANCES"));
-
-    sql = "SELECT id      " + "FROM LIVEINSTANCES";
-
-    from = p.parseFromTarget(sql);
-    Assert.assertTrue(from.equals("LIVEINSTANCES"));
-
-    sql = "SELECT id      " + " LIVEINSTANCES where tt=00";
-    boolean exceptionThrown = false;
-    try {
-      from = p.parseFromTarget(sql);
-    } catch (HelixException e) {
-      exceptionThrown = true;
-    }
-    Assert.assertTrue(exceptionThrown);
-  }
-
-  @Test(groups = ("unitTest"))
-  public void testOrderby() throws Exception {
-    HelixManager manager = _participants[0];
-    // ((TestHelper.StartCMResult) (_startCMResultMap.values().toArray()[0]))._manager;
-
-    Map<String, ZNRecord> scnMap = new HashMap<String, ZNRecord>();
-    for (int i = 0; i < NODE_NR; i++) {
-      String instance = "localhost_" + (12918 + i);
-      ZNRecord metaData = new ZNRecord(instance);
-      metaData.setSimpleField(LiveInstanceProperty.SESSION_ID.toString(), UUID.randomUUID()
-          .toString());
-      metaData.setMapField("SCN", new HashMap<String, String>());
-      for (int j = 0; j < _PARTITIONS; j++) {
-        metaData.getMapField("SCN").put(TEST_DB + "_" + j, "" + i);
-      }
-      scnMap.put(instance, metaData);
-    }
-    Map<String, Object> bindVariables = new HashMap<String, Object>();
-    bindVariables.put("scnMap", scnMap);
-    String SQL =
-        " SELECT DISTINCT mapSubKey AS 'subkey', mapValue AS 'mapValue' , getMapFieldValue(getZNRecordFromMap(:scnMap, mapSubKey), 'SCN', mapKey) AS 'SCN'"
-            + " FROM EXTERNALVIEW.Table "
-            + " WHERE mapKey LIKE 'TestDB_1' "
-            + " AND mapSubKey LIKE '%' "
-            + " AND mapValue LIKE 'SLAVE' "
-            + " AND mapSubKey IN ((SELECT [*]id FROM :LIVEINSTANCES)) "
-            + " ORDER BY parseInt(getMapFieldValue(getZNRecordFromMap(:scnMap, mapSubKey), 'SCN', mapKey))";
-
-    ClusterJosqlQueryProcessor p = new ClusterJosqlQueryProcessor(manager);
-    List<Object> result = p.runJoSqlQuery(SQL, bindVariables, null);
-    int prevSCN = -1;
-    for (Object row : result) {
-      List<String> stringRow = (List<String>) row;
-      Assert.assertTrue(stringRow.get(1).equals("SLAVE"));
-      int scn = Integer.parseInt(stringRow.get(2));
-      Assert.assertTrue(scn > prevSCN);
-      prevSCN = scn;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/5019d96e/helix-core/src/test/java/org/apache/helix/messaging/TestDefaultMessagingService.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/TestDefaultMessagingService.java b/helix-core/src/test/java/org/apache/helix/messaging/TestDefaultMessagingService.java
index 9686e16..95abd29 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/TestDefaultMessagingService.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/TestDefaultMessagingService.java
@@ -179,30 +179,63 @@ public class TestDefaultMessagingService {
     recipientCriteria.setSelfExcluded(false);
     AssertJUnit.assertEquals(1, svc.send(recipientCriteria, template));
 
+    // all instances, all partitions
     recipientCriteria.setSelfExcluded(false);
     recipientCriteria.setInstanceName("%");
     recipientCriteria.setResource("DB");
     recipientCriteria.setPartition("%");
     AssertJUnit.assertEquals(200, svc.send(recipientCriteria, template));
 
-    recipientCriteria.setSelfExcluded(true);
-    recipientCriteria.setInstanceName("%");
+    // all instances, all partitions, use * instead of %
+    recipientCriteria.setSelfExcluded(false);
+    recipientCriteria.setInstanceName("*");
+    recipientCriteria.setResource("DB");
+    recipientCriteria.setPartition("*");
+    AssertJUnit.assertEquals(200, svc.send(recipientCriteria, template));
+
+    // tail pattern
+    recipientCriteria.setSelfExcluded(false);
+    recipientCriteria.setInstanceName("localhost%");
     recipientCriteria.setResource("DB");
     recipientCriteria.setPartition("%");
-    AssertJUnit.assertEquals(159, svc.send(recipientCriteria, template));
+    AssertJUnit.assertEquals(200, svc.send(recipientCriteria, template));
 
+    // exclude this instance, send to all others for all partitions
     recipientCriteria.setSelfExcluded(true);
     recipientCriteria.setInstanceName("%");
     recipientCriteria.setResource("DB");
     recipientCriteria.setPartition("%");
     AssertJUnit.assertEquals(159, svc.send(recipientCriteria, template));
 
+    // single instance, all partitions
     recipientCriteria.setSelfExcluded(true);
     recipientCriteria.setInstanceName("localhost_12920");
     recipientCriteria.setResource("DB");
     recipientCriteria.setPartition("%");
     AssertJUnit.assertEquals(39, svc.send(recipientCriteria, template));
 
+    // single character wildcards
+    recipientCriteria.setSelfExcluded(true);
+    recipientCriteria.setInstanceName("l_calhost_12_20");
+    recipientCriteria.setResource("DB");
+    recipientCriteria.setPartition("%");
+    AssertJUnit.assertEquals(39, svc.send(recipientCriteria, template));
+
+    // head pattern
+    recipientCriteria.setSelfExcluded(true);
+    recipientCriteria.setInstanceName("%12920");
+    recipientCriteria.setResource("DB");
+    recipientCriteria.setPartition("%");
+    AssertJUnit.assertEquals(39, svc.send(recipientCriteria, template));
+
+    // middle pattern
+    recipientCriteria.setSelfExcluded(true);
+    recipientCriteria.setInstanceName("l%_12920");
+    recipientCriteria.setResource("DB");
+    recipientCriteria.setPartition("%");
+    AssertJUnit.assertEquals(39, svc.send(recipientCriteria, template));
+
+    // send to a controller
     recipientCriteria.setSelfExcluded(true);
     recipientCriteria.setInstanceName("localhost_12920");
     recipientCriteria.setRecipientInstanceType(InstanceType.CONTROLLER);

http://git-wip-us.apache.org/repos/asf/helix/blob/5019d96e/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f3907f8..77eb495 100644
--- a/pom.xml
+++ b/pom.xml
@@ -168,11 +168,6 @@ under the License.
       <name>SnakeYAML repository</name>
       <url>http://oss.sonatype.org/content/groups/public/</url>
     </repository>
-    <repository>
-      <id>jboss-fs-public</id>
-      <name>JBoss FuseSource repository</name>
-      <url>http://repository.jboss.org/nexus/content/groups/fs-public/</url>
-    </repository>
   </repositories>
 
  


[3/3] git commit: [HELIX-392] Write a test to ensure that ZK connection loss is silent

Posted by ka...@apache.org.
[HELIX-392] Write a test to ensure that ZK connection loss is silent


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/6e587b2d
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/6e587b2d
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/6e587b2d

Branch: refs/heads/helix-0.6.2-release
Commit: 6e587b2d1252fed54b42f6daefeee12242f22b1c
Parents: 5019d96
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Wed Feb 26 17:55:08 2014 -0800
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Fri Mar 14 10:02:36 2014 -0700

----------------------------------------------------------------------
 .../TestCorrectnessOnConnectivityLoss.java      | 203 +++++++++++++++++++
 1 file changed, 203 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/6e587b2d/helix-core/src/test/java/org/apache/helix/integration/TestCorrectnessOnConnectivityLoss.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCorrectnessOnConnectivityLoss.java b/helix-core/src/test/java/org/apache/helix/integration/TestCorrectnessOnConnectivityLoss.java
new file mode 100644
index 0000000..3b44f2c
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestCorrectnessOnConnectivityLoss.java
@@ -0,0 +1,203 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Map;
+
+import org.I0Itec.zkclient.ZkServer;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.TestHelper;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.StateTransitionError;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.helix.spectator.RoutingTableProvider;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Maps;
+
+public class TestCorrectnessOnConnectivityLoss {
+  private static final String ZK_ADDR = "localhost:2189";
+  private ZkServer _zkServer;
+  private String _clusterName;
+  private ClusterControllerManager _controller;
+
+  @BeforeMethod
+  public void beforeMethod() throws Exception {
+    _zkServer = TestHelper.startZkServer(ZK_ADDR);
+
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    _clusterName = className + "_" + methodName;
+    TestHelper.setupCluster(_clusterName, ZK_ADDR, 12918, // participant start port
+        "localhost", // participant host
+        "resource", // resource name prefix
+        1, // number of resources
+        1, // number of partitions
+        1, // number of participants
+        1, // number of replicas
+        "OnlineOffline", // state model
+        RebalanceMode.FULL_AUTO, // automatic assignment
+        true); // rebalance
+
+    _controller = new ClusterControllerManager(ZK_ADDR, _clusterName, "controller0");
+    _controller.connect();
+  }
+
+  @Test
+  public void testParticipant() throws Exception {
+    Map<String, Integer> stateReachedCounts = Maps.newHashMap();
+    HelixManager participant =
+        HelixManagerFactory.getZKHelixManager(_clusterName, "localhost_12918",
+            InstanceType.PARTICIPANT, ZK_ADDR);
+    participant.getStateMachineEngine().registerStateModelFactory("OnlineOffline",
+        new MyStateModelFactory(stateReachedCounts));
+    participant.connect();
+
+    Thread.sleep(1000);
+
+    // Ensure that the external view coalesces
+    boolean result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+            _clusterName));
+    Assert.assertTrue(result);
+
+    // Ensure that there was only one state transition
+    Assert.assertEquals(stateReachedCounts.size(), 1);
+    Assert.assertTrue(stateReachedCounts.containsKey("ONLINE"));
+    Assert.assertEquals(stateReachedCounts.get("ONLINE").intValue(), 1);
+
+    // Now let's stop the ZK server; this should do nothing
+    TestHelper.stopZkServer(_zkServer);
+    Thread.sleep(1000);
+
+    // Verify no change
+    Assert.assertEquals(stateReachedCounts.size(), 1);
+    Assert.assertTrue(stateReachedCounts.containsKey("ONLINE"));
+    Assert.assertEquals(stateReachedCounts.get("ONLINE").intValue(), 1);
+  }
+
+  @SuppressWarnings("deprecation")
+  @Test
+  public void testSpectator() throws Exception {
+    Map<String, Integer> stateReachedCounts = Maps.newHashMap();
+    HelixManager participant =
+        HelixManagerFactory.getZKHelixManager(_clusterName, "localhost_12918",
+            InstanceType.PARTICIPANT, ZK_ADDR);
+    participant.getStateMachineEngine().registerStateModelFactory("OnlineOffline",
+        new MyStateModelFactory(stateReachedCounts));
+    participant.connect();
+
+    RoutingTableProvider routingTableProvider = new RoutingTableProvider();
+    HelixManager spectator =
+        HelixManagerFactory.getZKHelixManager(_clusterName, "spectator", InstanceType.SPECTATOR,
+            ZK_ADDR);
+    spectator.connect();
+    spectator.addConfigChangeListener(routingTableProvider);
+    spectator.addExternalViewChangeListener(routingTableProvider);
+    Thread.sleep(1000);
+
+    // Now let's stop the ZK server; this should do nothing
+    TestHelper.stopZkServer(_zkServer);
+    Thread.sleep(1000);
+
+    // Verify routing table still works
+    Assert.assertEquals(routingTableProvider.getInstances("resource0", "ONLINE").size(), 1);
+    Assert.assertEquals(routingTableProvider.getInstances("resource0", "OFFLINE").size(), 0);
+  }
+
+  @AfterMethod
+  public void afterMethod() throws Exception {
+    TestHelper.stopZkServer(_zkServer);
+  }
+
+  @StateModelInfo(initialState = "OFFLINE", states = {
+      "MASTER", "SLAVE", "OFFLINE", "ERROR"
+  })
+  public static class MyStateModel extends StateModel {
+    private final Map<String, Integer> _counts;
+
+    public MyStateModel(Map<String, Integer> counts) {
+      _counts = counts;
+    }
+
+    @Transition(to = "ONLINE", from = "OFFLINE")
+    public void onBecomeOnlineFromOffline(Message message, NotificationContext context) {
+      incrementCount(message.getToState());
+    }
+
+    @Transition(to = "OFFLINE", from = "ONLINE")
+    public void onBecomeOfflineFromOnline(Message message, NotificationContext context) {
+      incrementCount(message.getToState());
+    }
+
+    @Transition(to = "DROPPED", from = "OFFLINE")
+    public void onBecomeDroppedFromOffline(Message message, NotificationContext context) {
+      incrementCount(message.getToState());
+    }
+
+    @Transition(to = "OFFLINE", from = "ERROR")
+    public void onBecomeOfflineFromError(Message message, NotificationContext context) {
+      incrementCount(message.getToState());
+    }
+
+    @Transition(to = "DROPPED", from = "ERROR")
+    public void onBecomeDroppedFromError(Message message, NotificationContext context) {
+      incrementCount(message.getToState());
+    }
+
+    @Override
+    public void rollbackOnError(Message message, NotificationContext context,
+        StateTransitionError error) {
+      incrementCount("rollback");
+    }
+
+    private synchronized void incrementCount(String toState) {
+      int current = (_counts.containsKey(toState)) ? _counts.get(toState) : 0;
+      _counts.put(toState, current + 1);
+    }
+  }
+
+  public static class MyStateModelFactory extends StateModelFactory<MyStateModel> {
+
+    private final Map<String, Integer> _counts;
+
+    public MyStateModelFactory(Map<String, Integer> counts) {
+      _counts = counts;
+    }
+
+    @Override
+    public MyStateModel createNewStateModel(String partitionId) {
+      return new MyStateModel(_counts);
+    }
+  }
+}