You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/03/14 18:21:14 UTC
[1/3] git commit: [HELIX-356] add a tool for grep zk
transaction/snapshot logs based on time, rb=16935
Repository: helix
Updated Branches:
refs/heads/helix-0.6.2-release c6cb2c2c8 -> 6e587b2d1
[HELIX-356] add a tool for grep zk transaction/snapshot logs based on time,rb=16935
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/efc1defd
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/efc1defd
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/efc1defd
Branch: refs/heads/helix-0.6.2-release
Commit: efc1defd0910eb6137318c7e48253ee736265da5
Parents: c6cb2c2
Author: zzhang <zz...@uci.edu>
Authored: Wed Jan 15 17:56:27 2014 -0800
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Thu Mar 13 18:51:44 2014 -0700
----------------------------------------------------------------------
helix-core/pom.xml | 8 +-
.../java/org/apache/helix/tools/ZkGrep.java | 641 +++++++++++++++++++
.../org/apache/helix/tools/ZkLogAnalyzer.java | 441 -------------
3 files changed, 645 insertions(+), 445 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/efc1defd/helix-core/pom.xml
----------------------------------------------------------------------
diff --git a/helix-core/pom.xml b/helix-core/pom.xml
index 32ff9f9..cc6b86e 100644
--- a/helix-core/pom.xml
+++ b/helix-core/pom.xml
@@ -211,10 +211,6 @@ under the License.
<name>start-standalone-zookeeper</name>
</program>
<program>
- <mainClass>org.apache.helix.tools.ZkLogAnalyzer</mainClass>
- <name>zk-log-analyzer</name>
- </program>
- <program>
<mainClass>org.apache.helix.examples.Quickstart</mainClass>
<name>quickstart</name>
</program>
@@ -230,6 +226,10 @@ under the License.
<mainClass>org.apache.helix.tools.IntegrationTestUtil</mainClass>
<name>test-util</name>
</program>
+ <program>
+ <mainClass>org.apache.helix.tools.ZkGrep</mainClass>
+ <name>zkgrep</name>
+ </program>
</programs>
</configuration>
</plugin>
http://git-wip-us.apache.org/repos/asf/helix/blob/efc1defd/helix-core/src/main/java/org/apache/helix/tools/ZkGrep.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ZkGrep.java b/helix-core/src/main/java/org/apache/helix/tools/ZkGrep.java
new file mode 100644
index 0000000..3abf78f
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/ZkGrep.java
@@ -0,0 +1,641 @@
+package org.apache.helix.tools;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.List;
+import java.util.zip.GZIPInputStream;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.OptionGroup;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.io.FileUtils;
+import org.apache.log4j.Logger;
+
+/**
+ * utility for grep zk transaction/snapshot logs
+ * - to grep a pattern by t1 use:
+ * zkgrep --zkCfg zkCfg --by t1 --pattern patterns...
+ * - to grep a pattern between t1 and t2 use:
+ * zkgrep --zkCfg zkCfg --between t1 t2 --pattern patterns...
+ * for example, to find fail-over latency between t1 and t2, use:
+ * 1) zkgrep --zkCfg zkCfg --by t1 --pattern "/{cluster}/LIVEINSTNCES/" | grep {fail-node}
+ * 2) zkgrep --zkCfg zkCfg --between t1 t2 --pattern "closeSession" | grep {fail-node session-id}
+ * 3) zkgrep --zkCfg zkCfg --between t1 t2 --pattern "/{cluster}" | grep "CURRENTSTATES" |
+ * grep "setData" | tail -1
+ * fail-over latency = timestamp difference between 2) and 3)
+ */
+public class ZkGrep {
+ private static Logger LOG = Logger.getLogger(ZkGrep.class);
+
+ private static final String zkCfg = "zkCfg";
+ private static final String pattern = "pattern";
+ private static final String by = "by";
+ private static final String between = "between";
+
+ public static final String log = "log";
+ public static final String snapshot = "snapshot";
+
+ private static final String gzSuffix = ".gz";
+
+ @SuppressWarnings("static-access")
+ private static Options constructCommandLineOptions() {
+ Option zkCfgOption =
+ OptionBuilder.hasArgs(1).isRequired(false).withLongOpt(zkCfg).withArgName("zoo.cfg")
+ .withDescription("provide zoo.cfg").create();
+
+ Option patternOption =
+ OptionBuilder.hasArgs().isRequired(true).withLongOpt(pattern)
+ .withArgName("grep-patterns...").withDescription("provide patterns (required)")
+ .create();
+
+ Option betweenOption =
+ OptionBuilder.hasArgs(2).isRequired(false).withLongOpt(between)
+ .withArgName("t1 t2 (timestamp in ms or yyMMdd_hhmmss_SSS)")
+ .withDescription("grep between t1 and t2").create();
+
+ Option byOption =
+ OptionBuilder.hasArgs(1).isRequired(false).withLongOpt(by)
+ .withArgName("t (timestamp in ms or yyMMdd_hhmmss_SSS)").withDescription("grep by t")
+ .create();
+
+ OptionGroup group = new OptionGroup();
+ group.setRequired(true);
+ group.addOption(betweenOption);
+ group.addOption(byOption);
+
+ Options options = new Options();
+ options.addOption(zkCfgOption);
+ options.addOption(patternOption);
+ options.addOptionGroup(group);
+ return options;
+ }
+
+ /**
+ * get zk transaction log dir and zk snapshot log dir
+ * @param zkCfgFile
+ * @return String[0]: zk-transaction-log-dir, String[1]: zk-snapshot-dir
+ */
+ static String[] getZkDataDirs(String zkCfgFile) {
+ String[] zkDirs = new String[2];
+
+ FileInputStream fis = null;
+ BufferedReader br = null;
+ try {
+ fis = new FileInputStream(zkCfgFile);
+ br = new BufferedReader(new InputStreamReader(fis));
+
+ String line;
+ while ((line = br.readLine()) != null) {
+ String key = "dataDir=";
+ if (line.startsWith(key)) {
+ zkDirs[1] = zkDirs[0] = line.substring(key.length()) + "/version-2";
+ }
+
+ key = "dataLogDir=";
+ if (line.startsWith(key)) {
+ zkDirs[0] = line.substring(key.length()) + "/version-2";
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("exception in read file: " + zkCfgFile, e);
+ } finally {
+ try {
+ if (br != null) {
+ br.close();
+ }
+
+ if (fis != null) {
+ fis.close();
+ }
+
+ } catch (Exception e) {
+ LOG.error("exception in closing file: " + zkCfgFile, e);
+ }
+ }
+
+ return zkDirs;
+ }
+
+ // debug
+ static void printFiles(File[] files) {
+ System.out.println("START print");
+ for (int i = 0; i < files.length; i++) {
+ File file = files[i];
+ System.out.println(file.getName() + ", " + file.lastModified());
+ }
+ System.out.println("END print");
+ }
+
+ /**
+ * get files under dir in order of last modified time
+ * @param dir
+ * @param pattern
+ * @return
+ */
+ static File[] getSortedFiles(String dirPath, final String pattern) {
+ File dir = new File(dirPath);
+ File[] files = dir.listFiles(new FileFilter() {
+
+ @Override
+ public boolean accept(File file) {
+ return file.isFile() && (file.getName().indexOf(pattern) != -1);
+ }
+ });
+
+ Arrays.sort(files, new Comparator<File>() {
+
+ @Override
+ public int compare(File o1, File o2) {
+ int sign = (int) Math.signum(o1.lastModified() - o2.lastModified());
+ return sign;
+ }
+
+ });
+ return files;
+ }
+
+ /**
+ * get value for an attribute in a parsed zk log; e.g.
+ * "time:1384984016778 session:0x14257d1d17e0004 cxid:0x5 zxid:0x46899 type:error err:-101"
+ * given "time" return "1384984016778"
+ * @param line
+ * @param attribute
+ * @return value
+ */
+ static String getAttributeValue(String line, String attribute) {
+ if (line == null) {
+ return null;
+ }
+
+ if (!attribute.endsWith(":")) {
+ attribute = attribute + ":";
+ }
+
+ String[] parts = line.split("\\s");
+ if (parts != null && parts.length > 0) {
+ for (int i = 0; i < parts.length; i++) {
+ if (parts[i].startsWith(attribute)) {
+ String val = parts[i].substring(attribute.length());
+ return val;
+ }
+ }
+ }
+ return null;
+ }
+
+ static long getTimestamp(String line) {
+ String timestamp = getAttributeValue(line, "time");
+ return Long.parseLong(timestamp);
+ }
+
+ /**
+ * parse a time string either in timestamp form or "yyMMdd_hhmmss_SSS" form
+ * @param time
+ * @return timestamp or -1 on error
+ */
+ static long parseTimeString(String time) {
+ try {
+ return Long.parseLong(time);
+ } catch (NumberFormatException e) {
+ try {
+ SimpleDateFormat formatter = new SimpleDateFormat("yyMMdd_hhmmss_SSS");
+ Date date = formatter.parse(time);
+ return date.getTime();
+ } catch (java.text.ParseException ex) {
+ LOG.error("fail to parse time string: " + time, e);
+ }
+ }
+ return -1;
+ }
+
+ public static void grepZkLog(File zkLog, long start, long end, String... patterns) {
+ FileInputStream fis = null;
+ BufferedReader br = null;
+ try {
+ fis = new FileInputStream(zkLog);
+ br = new BufferedReader(new InputStreamReader(fis));
+
+ String line;
+ while ((line = br.readLine()) != null) {
+ try {
+ long timestamp = getTimestamp(line);
+ if (timestamp > end) {
+ break;
+ }
+
+ if (timestamp < start) {
+ continue;
+ }
+
+ boolean match = true;
+ for (String pattern : patterns) {
+ if (line.indexOf(pattern) == -1) {
+ match = false;
+ break;
+ }
+ }
+
+ if (match) {
+ System.out.println(line);
+ }
+
+ } catch (NumberFormatException e) {
+ // ignore
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("exception in grep zk-log: " + zkLog, e);
+ } finally {
+ try {
+ if (br != null) {
+ br.close();
+ }
+
+ if (fis != null) {
+ fis.close();
+ }
+
+ } catch (Exception e) {
+ LOG.error("exception in closing zk-log: " + zkLog, e);
+ }
+ }
+ }
+
+ public static void grepZkLogDir(List<File> parsedZkLogs, long start, long end, String... patterns) {
+ for (File file : parsedZkLogs) {
+ grepZkLog(file, start, end, patterns);
+
+ }
+
+ }
+
+ public static void grepZkSnapshot(File zkSnapshot, String... patterns) {
+ FileInputStream fis = null;
+ BufferedReader br = null;
+ try {
+ fis = new FileInputStream(zkSnapshot);
+ br = new BufferedReader(new InputStreamReader(fis));
+
+ String line;
+ while ((line = br.readLine()) != null) {
+ try {
+ boolean match = true;
+ for (String pattern : patterns) {
+ if (line.indexOf(pattern) == -1) {
+ match = false;
+ break;
+ }
+ }
+
+ if (match) {
+ System.out.println(line);
+ }
+
+ } catch (NumberFormatException e) {
+ // ignore
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("exception in grep zk-snapshot: " + zkSnapshot, e);
+ } finally {
+ try {
+ if (br != null) {
+ br.close();
+ }
+
+ if (fis != null) {
+ fis.close();
+ }
+
+ } catch (Exception e) {
+ LOG.error("exception in closing zk-snapshot: " + zkSnapshot, e);
+ }
+ }
+ }
+
+ /**
+ * guess zoo.cfg dir
+ * @return absolute path to zoo.cfg
+ */
+ static String guessZkCfgDir() {
+ // TODO impl this
+ return null;
+ }
+
+ public static void printUsage(Options cliOptions) {
+ HelpFormatter helpFormatter = new HelpFormatter();
+ helpFormatter.setWidth(1000);
+ helpFormatter.printHelp("java " + ZkGrep.class.getName(), cliOptions);
+ }
+
+ /**
+ * parse zk-transaction-logs between start and end, if not already parsed
+ * @param zkLogDir
+ * @param start
+ * @param end
+ * @return list of parsed zklogs between start and end, in order of last modified timestamp
+ */
+ static List<File> parseZkLogs(String zkLogDir, long start, long end) {
+ File zkParsedDir = new File(String.format("%s/zklog-parsed", System.getProperty("user.home")));
+ File[] zkLogs = getSortedFiles(zkLogDir, log);
+ // printFiles(zkDataFiles);
+ List<File> parsedZkLogs = new ArrayList<File>();
+
+ boolean stop = false;
+ for (File zkLog : zkLogs) {
+ if (stop) {
+ break;
+ }
+
+ if (zkLog.lastModified() < start) {
+ continue;
+ }
+
+ if (zkLog.lastModified() > end) {
+ stop = true;
+ }
+
+ try {
+ File parsedZkLog = new File(zkParsedDir, stripGzSuffix(zkLog.getName()) + ".parsed");
+ if (!parsedZkLog.exists() || parsedZkLog.lastModified() <= zkLog.lastModified()) {
+
+ if (zkLog.getName().endsWith(gzSuffix)) {
+ // copy and gunzip it
+ FileUtils.copyFileToDirectory(zkLog, zkParsedDir);
+ File zkLogGz = new File(zkParsedDir, zkLog.getName());
+ File tmpZkLog = gunzip(zkLogGz);
+
+ // parse gunzip file
+ ZKLogFormatter.main(new String[] {
+ log, tmpZkLog.getAbsolutePath(), parsedZkLog.getAbsolutePath()
+ });
+
+ // delete it
+ zkLogGz.delete();
+ tmpZkLog.delete();
+ } else {
+ // parse it directly
+ ZKLogFormatter.main(new String[] {
+ log, zkLog.getAbsolutePath(), parsedZkLog.getAbsolutePath()
+ });
+ }
+ }
+ parsedZkLogs.add(parsedZkLog);
+ } catch (Exception e) {
+ LOG.error("fail to parse zkLog: " + zkLog, e);
+ }
+ }
+
+ return parsedZkLogs;
+ }
+
+ /**
+ * Strip off a .gz suffix if any
+ * @param filename
+ * @return
+ */
+ static String stripGzSuffix(String filename) {
+ if (filename.endsWith(gzSuffix)) {
+ return filename.substring(0, filename.length() - gzSuffix.length());
+ }
+ return filename;
+ }
+
+ /**
+ * Gunzip a file
+ * @param zipFile
+ * @return
+ */
+ static File gunzip(File zipFile) {
+ File outputFile = new File(stripGzSuffix(zipFile.getAbsolutePath()));
+
+ byte[] buffer = new byte[1024];
+
+ try {
+
+ GZIPInputStream gzis = new GZIPInputStream(new FileInputStream(zipFile));
+ FileOutputStream out = new FileOutputStream(outputFile);
+
+ int len;
+ while ((len = gzis.read(buffer)) > 0) {
+ out.write(buffer, 0, len);
+ }
+
+ gzis.close();
+ out.close();
+
+ return outputFile;
+ } catch (IOException e) {
+ LOG.error("fail to gunzip file: " + zipFile, e);
+ }
+
+ return null;
+ }
+
+ /**
+ * parse the last zk-snapshots by by-time, if not already parsed
+ * @param zkSnapshotDir
+ * @param byTime
+ * @return File array which the first element is the last zk-snapshot by by-time and the second
+ * element is its parsed file
+ */
+ static File[] parseZkSnapshot(String zkSnapshotDir, long byTime) {
+ File[] retFiles = new File[2];
+ File zkParsedDir = new File(String.format("%s/zklog-parsed", System.getProperty("user.home")));
+ File[] zkSnapshots = getSortedFiles(zkSnapshotDir, snapshot);
+ // printFiles(zkDataFiles);
+ File lastZkSnapshot = null;
+ for (int i = 0; i < zkSnapshots.length; i++) {
+ File zkSnapshot = zkSnapshots[i];
+ if (zkSnapshot.lastModified() >= byTime) {
+ break;
+ }
+ lastZkSnapshot = zkSnapshot;
+ retFiles[0] = lastZkSnapshot;
+ }
+
+ try {
+ File parsedZkSnapshot =
+ new File(zkParsedDir, stripGzSuffix(lastZkSnapshot.getName()) + ".parsed");
+ if (!parsedZkSnapshot.exists()
+ || parsedZkSnapshot.lastModified() <= lastZkSnapshot.lastModified()) {
+
+ if (lastZkSnapshot.getName().endsWith(gzSuffix)) {
+ // copy and gunzip it
+ FileUtils.copyFileToDirectory(lastZkSnapshot, zkParsedDir);
+ File lastZkSnapshotGz = new File(zkParsedDir, lastZkSnapshot.getName());
+ File tmpLastZkSnapshot = gunzip(lastZkSnapshotGz);
+
+ // parse gunzip file
+ ZKLogFormatter.main(new String[] {
+ snapshot, tmpLastZkSnapshot.getAbsolutePath(), parsedZkSnapshot.getAbsolutePath()
+ });
+
+ // delete it
+ lastZkSnapshotGz.delete();
+ tmpLastZkSnapshot.delete();
+ } else {
+ // parse it directly
+ ZKLogFormatter.main(new String[] {
+ snapshot, lastZkSnapshot.getAbsolutePath(), parsedZkSnapshot.getAbsolutePath()
+ });
+ }
+
+ }
+ retFiles[1] = parsedZkSnapshot;
+ return retFiles;
+ } catch (Exception e) {
+ LOG.error("fail to parse zkSnapshot: " + lastZkSnapshot, e);
+ }
+
+ return null;
+ }
+
+ public static void processCommandLineArgs(String[] cliArgs) {
+ CommandLineParser cliParser = new GnuParser();
+ Options cliOptions = constructCommandLineOptions();
+ CommandLine cmd = null;
+
+ try {
+ cmd = cliParser.parse(cliOptions, cliArgs);
+ } catch (ParseException pe) {
+ System.err.println("CommandLineClient: failed to parse command-line options: " + pe);
+ printUsage(cliOptions);
+ System.exit(1);
+ }
+
+ String zkCfgDirValue = null;
+ String zkCfgFile = null;
+
+ if (cmd.hasOption(zkCfg)) {
+ zkCfgDirValue = cmd.getOptionValue(zkCfg);
+ }
+
+ if (zkCfgDirValue == null) {
+ zkCfgDirValue = guessZkCfgDir();
+ }
+
+ if (zkCfgDirValue == null) {
+ LOG.error("couldn't figure out path to zkCfg file");
+ System.exit(1);
+ }
+
+ // get zoo.cfg path from cfg-dir
+ zkCfgFile = zkCfgDirValue;
+ if (!zkCfgFile.endsWith(".cfg")) {
+ // append with default zoo.cfg
+ zkCfgFile = zkCfgFile + "/zoo.cfg";
+ }
+
+ if (!new File(zkCfgFile).exists()) {
+ LOG.error("zoo.cfg file doen't exist: " + zkCfgFile);
+ System.exit(1);
+ }
+
+ String[] patterns = cmd.getOptionValues(pattern);
+
+ String[] zkDataDirs = getZkDataDirs(zkCfgFile);
+
+ // parse zk data files
+ if (zkDataDirs == null || zkDataDirs[0] == null || zkDataDirs[1] == null) {
+ LOG.error("invalid zkCfgDir: " + zkCfgDirValue);
+ System.exit(1);
+ }
+
+ File zkParsedDir = new File(String.format("%s/zklog-parsed", System.getProperty("user.home")));
+ if (!zkParsedDir.exists()) {
+ LOG.info("creating zklog-parsed dir: " + zkParsedDir.getAbsolutePath());
+ zkParsedDir.mkdir();
+ }
+
+ if (cmd.hasOption(between)) {
+ String[] timeStrings = cmd.getOptionValues(between);
+
+ long startTime = parseTimeString(timeStrings[0]);
+ if (startTime == -1) {
+ LOG.error("invalid start time string: " + timeStrings[0]
+ + ", should be either timestamp or yyMMdd_hhmmss_SSS");
+ System.exit(1);
+ }
+
+ long endTime = parseTimeString(timeStrings[1]);
+ if (endTime == -1) {
+ LOG.error("invalid end time string: " + timeStrings[1]
+ + ", should be either timestamp or yyMMdd_hhmmss_SSS");
+ System.exit(1);
+ }
+
+ if (startTime > endTime) {
+ LOG.warn("empty window: " + startTime + " - " + endTime);
+ System.exit(1);
+ }
+ // zkDataDirs[0] is the transaction log dir
+ List<File> parsedZkLogs = parseZkLogs(zkDataDirs[0], startTime, endTime);
+ grepZkLogDir(parsedZkLogs, startTime, endTime, patterns);
+
+ } else if (cmd.hasOption(by)) {
+ String timeString = cmd.getOptionValue(by);
+
+ long byTime = parseTimeString(timeString);
+ if (byTime == -1) {
+ LOG.error("invalid by time string: " + timeString
+ + ", should be either timestamp or yyMMdd_hhmmss_SSS");
+ System.exit(1);
+ }
+
+ // zkDataDirs[1] is the snapshot dir
+ File[] lastZkSnapshot = parseZkSnapshot(zkDataDirs[1], byTime);
+
+ // lastZkSnapshot[1] is the parsed last snapshot by byTime
+ grepZkSnapshot(lastZkSnapshot[1], patterns);
+
+ // need to grep transaction logs between last-modified-time of snapshot and byTime also
+ // lastZkSnapshot[0] is the last snapshot by byTime
+ long startTime = lastZkSnapshot[0].lastModified();
+
+ // zkDataDirs[0] is the transaction log dir
+ List<File> parsedZkLogs = parseZkLogs(zkDataDirs[0], startTime, byTime);
+ grepZkLogDir(parsedZkLogs, startTime, byTime, patterns);
+ }
+ }
+
+ public static void main(String[] args) {
+ processCommandLineArgs(args);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/efc1defd/helix-core/src/main/java/org/apache/helix/tools/ZkLogAnalyzer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ZkLogAnalyzer.java b/helix-core/src/main/java/org/apache/helix/tools/ZkLogAnalyzer.java
deleted file mode 100644
index 11e1b66..0000000
--- a/helix-core/src/main/java/org/apache/helix/tools/ZkLogAnalyzer.java
+++ /dev/null
@@ -1,441 +0,0 @@
-package org.apache.helix.tools;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileFilter;
-import java.io.FileInputStream;
-import java.io.InputStreamReader;
-import java.sql.Timestamp;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
-import org.apache.helix.ZNRecord;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.Message;
-import org.apache.helix.model.Message.MessageState;
-import org.apache.log4j.Logger;
-
-public class ZkLogAnalyzer {
- private static Logger LOG = Logger.getLogger(ZkLogAnalyzer.class);
- private static boolean dump = false;;
- final static ZNRecordSerializer _deserializer = new ZNRecordSerializer();
-
- static class Stats {
- int msgSentCount = 0;
- int msgSentCount_O2S = 0; // Offline to Slave
- int msgSentCount_S2M = 0; // Slave to Master
- int msgSentCount_M2S = 0; // Master to Slave
- int msgDeleteCount = 0;
- int msgModifyCount = 0;
- int curStateCreateCount = 0;
- int curStateUpdateCount = 0;
- int extViewCreateCount = 0;
- int extViewUpdateCount = 0;
- }
-
- static String getAttributeValue(String line, String attribute) {
- if (line == null)
- return null;
- String[] parts = line.split("\\s");
- if (parts != null && parts.length > 0) {
- for (int i = 0; i < parts.length; i++) {
- if (parts[i].startsWith(attribute)) {
- String val = parts[i].substring(attribute.length());
- return val;
- }
- }
- }
- return null;
- }
-
- static String findLastCSUpdateBetween(List<String> csUpdateLines, long start, long end) {
- long lastCSUpdateTimestamp = Long.MIN_VALUE;
- String lastCSUpdateLine = null;
- for (String line : csUpdateLines) {
- // ZNRecord record = getZNRecord(line);
- long timestamp = Long.parseLong(getAttributeValue(line, "time:"));
- if (timestamp >= start && timestamp <= end && timestamp > lastCSUpdateTimestamp) {
- lastCSUpdateTimestamp = timestamp;
- lastCSUpdateLine = line;
- }
- }
- assert (lastCSUpdateLine != null) : "No CS update between " + start + " - " + end;
- return lastCSUpdateLine;
- }
-
- static ZNRecord getZNRecord(String line) {
- ZNRecord record = null;
- String value = getAttributeValue(line, "data:");
- if (value != null) {
- record = (ZNRecord) _deserializer.deserialize(value.getBytes());
- // if (record == null)
- // {
- // System.out.println(line);
- // }
- }
- return record;
- }
-
- public static void main(String[] args) throws Exception {
- if (args.length != 3) {
- System.err
- .println("USAGE: ZkLogAnalyzer zkLogDir clusterName testStartTime (yyMMdd_hhmmss_SSS)");
- System.exit(1);
- }
-
- System.out.println("ZkLogAnalyzer called with args: " + Arrays.toString(args));
- // get create-timestamp of "/" + clusterName
- // find all zk logs after that create-timestamp and parse them
- // save parsed log in /tmp/zkLogAnalyzor_zklog.parsed0,1,2...
-
- String zkLogDir = args[0];
- String clusterName = args[1];
- // String zkAddr = args[2];
- String startTimeStr = args[2];
- // ZkClient zkClient = new ZkClient(zkAddr);
- // Stat clusterCreateStat = zkClient.getStat("/" + clusterName);
- SimpleDateFormat formatter = new SimpleDateFormat("yyMMdd_hhmmss_SSS");
- Date date = formatter.parse(startTimeStr);
- long startTimeStamp = date.getTime();
-
- System.out.println(clusterName + " created at " + date);
- while (zkLogDir.endsWith("/")) {
- zkLogDir = zkLogDir.substring(0, zkLogDir.length() - 1);
- }
- if (!zkLogDir.endsWith("/version-2")) {
- zkLogDir = zkLogDir + "/version-2";
- }
- File dir = new File(zkLogDir);
- File[] zkLogs = dir.listFiles(new FileFilter() {
-
- @Override
- public boolean accept(File file) {
- return file.isFile() && (file.getName().indexOf("log") != -1);
- }
- });
-
- // lastModified time -> zkLog
- TreeMap<Long, String> lastZkLogs = new TreeMap<Long, String>();
- for (File file : zkLogs) {
- if (file.lastModified() > startTimeStamp) {
- lastZkLogs.put(file.lastModified(), file.getAbsolutePath());
- }
- }
-
- List<String> parsedZkLogs = new ArrayList<String>();
- int i = 0;
- System.out.println("zk logs last modified later than " + new Timestamp(startTimeStamp));
- for (Long lastModified : lastZkLogs.keySet()) {
- String fileName = lastZkLogs.get(lastModified);
- System.out.println(new Timestamp(lastModified) + ": "
- + (fileName.substring(fileName.lastIndexOf('/') + 1)));
-
- String parsedFileName = "zkLogAnalyzor_zklog.parsed" + i;
- i++;
- ZKLogFormatter.main(new String[] {
- "log", fileName, parsedFileName
- });
- parsedZkLogs.add(parsedFileName);
- }
-
- // sessionId -> create liveInstance line
- Map<String, String> sessionMap = new HashMap<String, String>();
-
- // message send lines in time order
- // List<String> sendMessageLines = new ArrayList<String>();
-
- // CS update lines in time order
- List<String> csUpdateLines = new ArrayList<String>();
-
- String leaderSession = null;
-
- System.out.println();
- Stats stats = new Stats();
- long lastTestStartTimestamp = Long.MAX_VALUE;
- long controllerStartTime = 0;
- for (String parsedZkLog : parsedZkLogs) {
-
- FileInputStream fis = new FileInputStream(parsedZkLog);
- BufferedReader br = new BufferedReader(new InputStreamReader(fis));
-
- String inputLine;
- while ((inputLine = br.readLine()) != null) {
- String timestamp = getAttributeValue(inputLine, "time:");
- if (timestamp == null) {
- continue;
- }
- long timestampVal = Long.parseLong(timestamp);
- if (timestampVal < startTimeStamp) {
- continue;
- }
-
- if (dump == true) {
- String printLine = inputLine.replaceAll("data:.*", "");
- printLine =
- new Timestamp(timestampVal) + " "
- + printLine.substring(printLine.indexOf("session:"));
- System.err.println(printLine);
- }
-
- if (inputLine.indexOf("/start_disable") != -1) {
- dump = true;
- }
- if (inputLine.indexOf("/" + clusterName + "/CONFIGS/CLUSTER/verify") != -1) {
- String type = getAttributeValue(inputLine, "type:");
- if (type.equals("delete")) {
- System.out.println(timestamp + ": verify done");
- System.out.println("lastTestStartTimestamp:" + lastTestStartTimestamp);
- String lastCSUpdateLine =
- findLastCSUpdateBetween(csUpdateLines, lastTestStartTimestamp, timestampVal);
- long lastCSUpdateTimestamp =
- Long.parseLong(getAttributeValue(lastCSUpdateLine, "time:"));
- System.out.println("Last CS Update:" + lastCSUpdateTimestamp);
-
- System.out.println("state transition latency: "
- + +(lastCSUpdateTimestamp - lastTestStartTimestamp) + "ms");
-
- System.out.println("state transition latency since controller start: "
- + +(lastCSUpdateTimestamp - controllerStartTime) + "ms");
-
- System.out.println("Create MSG\t" + stats.msgSentCount + "\t" + stats.msgSentCount_O2S
- + "\t" + stats.msgSentCount_S2M + "\t" + stats.msgSentCount_M2S);
- System.out.println("Modify MSG\t" + stats.msgModifyCount);
- System.out.println("Delete MSG\t" + stats.msgDeleteCount);
- System.out.println("Create CS\t" + stats.curStateCreateCount);
- System.out.println("Update CS\t" + stats.curStateUpdateCount);
- System.out.println("Create EV\t" + stats.extViewCreateCount);
- System.out.println("Update EV\t" + stats.extViewUpdateCount);
-
- System.out.println();
- stats = new Stats();
- lastTestStartTimestamp = Long.MAX_VALUE;
- }
- } else if (inputLine.indexOf("/" + clusterName + "/LIVEINSTANCES/") != -1) {
- // cluster startup
- if (timestampVal < lastTestStartTimestamp) {
- System.out.println("START cluster. SETTING lastTestStartTimestamp to "
- + new Timestamp(timestampVal) + "\nline:" + inputLine);
- lastTestStartTimestamp = timestampVal;
- }
-
- ZNRecord record = getZNRecord(inputLine);
- LiveInstance liveInstance = new LiveInstance(record);
- String session = getAttributeValue(inputLine, "session:");
- sessionMap.put(session, inputLine);
- System.out.println(new Timestamp(Long.parseLong(timestamp)) + ": create LIVEINSTANCE "
- + liveInstance.getInstanceName());
- } else if (inputLine.indexOf("closeSession") != -1) {
- // kill any instance
- String session = getAttributeValue(inputLine, "session:");
- if (sessionMap.containsKey(session)) {
- if (timestampVal < lastTestStartTimestamp) {
- System.out.println("KILL node. SETTING lastTestStartTimestamp to " + timestampVal
- + " line:" + inputLine);
- lastTestStartTimestamp = timestampVal;
- }
- String line = sessionMap.get(session);
- ZNRecord record = getZNRecord(line);
- LiveInstance liveInstance = new LiveInstance(record);
-
- System.out.println(new Timestamp(Long.parseLong(timestamp)) + ": close session "
- + liveInstance.getInstanceName());
- dump = true;
- }
- } else if (inputLine.indexOf("/" + clusterName + "/CONFIGS/PARTICIPANT") != -1) {
- // disable a partition
- String type = getAttributeValue(inputLine, "type:");
- if (type.equals("setData") && inputLine.indexOf("HELIX_DISABLED_PARTITION") != -1) {
- if (timestampVal < lastTestStartTimestamp) {
- System.out.println("DISABLE partition. SETTING lastTestStartTimestamp to "
- + timestampVal + " line:" + inputLine);
- lastTestStartTimestamp = timestampVal;
- }
- }
- } else if (inputLine.indexOf("/" + clusterName + "/CONTROLLER/LEADER") != -1) {
- // leaderLine = inputLine;
- ZNRecord record = getZNRecord(inputLine);
- LiveInstance liveInstance = new LiveInstance(record);
- String session = getAttributeValue(inputLine, "session:");
- leaderSession = session;
- controllerStartTime = Long.parseLong(getAttributeValue(inputLine, "time:"));
- sessionMap.put(session, inputLine);
- System.out.println(new Timestamp(Long.parseLong(timestamp)) + ": create LEADER "
- + liveInstance.getInstanceName());
- } else if (inputLine.indexOf("/" + clusterName + "/") != -1
- && inputLine.indexOf("/CURRENTSTATES/") != -1) {
- String type = getAttributeValue(inputLine, "type:");
- if (type.equals("create")) {
- stats.curStateCreateCount++;
- } else if (type.equals("setData")) {
- String path = getAttributeValue(inputLine, "path:");
- csUpdateLines.add(inputLine);
- stats.curStateUpdateCount++;
- // getAttributeValue(line, "data");
- System.out.println("Update currentstate:" + new Timestamp(Long.parseLong(timestamp))
- + ":" + timestamp + " path:" + path);
- }
- } else if (inputLine.indexOf("/" + clusterName + "/EXTERNALVIEW/") != -1) {
- String session = getAttributeValue(inputLine, "session:");
- if (session.equals(leaderSession)) {
- String type = getAttributeValue(inputLine, "type:");
- if (type.equals("create")) {
- stats.extViewCreateCount++;
- } else if (type.equals("setData")) {
- stats.extViewUpdateCount++;
- }
- }
-
- // pos = inputLine.indexOf("EXTERNALVIEW");
- // pos = inputLine.indexOf("data:{", pos);
- // if (pos != -1)
- // {
- // String timestamp = getAttributeValue(inputLine, "time:");
- // ZNRecord record =
- // (ZNRecord) _deserializer.deserialize(inputLine.substring(pos + 5)
- // .getBytes());
- // ExternalView extView = new ExternalView(record);
- // int masterCnt = ClusterStateVerifier.countStateNbInExtView(extView,
- // "MASTER");
- // int slaveCnt = ClusterStateVerifier.countStateNbInExtView(extView, "SLAVE");
- // if (masterCnt == 1200)
- // {
- // System.out.println(timestamp + ": externalView " + extView.getResourceName()
- // + " has " + masterCnt + " MASTER, " + slaveCnt + " SLAVE");
- // }
- // }
- } else if (inputLine.indexOf("/" + clusterName + "/") != -1
- && inputLine.indexOf("/MESSAGES/") != -1) {
- String type = getAttributeValue(inputLine, "type:");
-
- if (type.equals("create")) {
- ZNRecord record = getZNRecord(inputLine);
- Message msg = new Message(record);
- String sendSession = getAttributeValue(inputLine, "session:");
- if (sendSession.equals(leaderSession) && msg.getMsgType().equals("STATE_TRANSITION")
- && msg.getMsgState() == MessageState.NEW) {
- // sendMessageLines.add(inputLine);
- stats.msgSentCount++;
-
- if (msg.getFromState().equals("OFFLINE") && msg.getToState().equals("SLAVE")) {
- stats.msgSentCount_O2S++;
- } else if (msg.getFromState().equals("SLAVE") && msg.getToState().equals("MASTER")) {
- stats.msgSentCount_S2M++;
- } else if (msg.getFromState().equals("MASTER") && msg.getToState().equals("SLAVE")) {
- stats.msgSentCount_M2S++;
- }
- // System.out.println("Message create:"+new
- // Timestamp(Long.parseLong(timestamp)));
- }
-
- // pos = inputLine.indexOf("MESSAGES");
- // pos = inputLine.indexOf("data:{", pos);
- // if (pos != -1)
- // {
- //
- // byte[] msgBytes = inputLine.substring(pos + 5).getBytes();
- // ZNRecord record = (ZNRecord) _deserializer.deserialize(msgBytes);
- // Message msg = new Message(record);
- // MessageState msgState = msg.getMsgState();
- // String msgType = msg.getMsgType();
- // if (msgType.equals("STATE_TRANSITION") && msgState == MessageState.NEW)
- // {
- // if (!msgs.containsKey(msg.getMsgId()))
- // {
- // msgs.put(msg.getMsgId(), new MsgItem(Long.parseLong(timestamp), msg));
- // }
- // else
- // {
- // LOG.error("msg: " + msg.getMsgId() + " already sent");
- // }
- //
- // System.out.println(timestamp + ": sendMsg " + msg.getPartitionName() + "("
- // + msg.getFromState() + "->" + msg.getToState() + ") to "
- // + msg.getTgtName() + ", size: " + msgBytes.length);
- // }
- // }
- } else if (type.equals("setData")) {
- stats.msgModifyCount++;
- // pos = inputLine.indexOf("MESSAGES");
- // pos = inputLine.indexOf("data:{", pos);
- // if (pos != -1)
- // {
- //
- // byte[] msgBytes = inputLine.substring(pos + 5).getBytes();
- // ZNRecord record = (ZNRecord) _deserializer.deserialize(msgBytes);
- // Message msg = new Message(record);
- // MessageState msgState = msg.getMsgState();
- // String msgType = msg.getMsgType();
- // if (msgType.equals("STATE_TRANSITION") && msgState == MessageState.READ)
- // {
- // if (!msgs.containsKey(msg.getMsgId()))
- // {
- // LOG.error("msg: " + msg.getMsgId() + " never sent");
- // }
- // else
- // {
- // MsgItem msgItem = msgs.get(msg.getMsgId());
- // if (msgItem.readTime == 0)
- // {
- // msgItem.readTime = Long.parseLong(timestamp);
- // msgs.put(msg.getMsgId(), msgItem);
- // // System.out.println(timestamp + ": readMsg " + msg.getPartitionName()
- // // + "("
- // // + msg.getFromState() + "->" + msg.getToState() + ") to "
- // // + msg.getTgtName() + ", latency: " + (msgItem.readTime -
- // // msgItem.sendTime));
- // }
- // }
- //
- // }
- // }
- } else if (type.equals("delete")) {
- stats.msgDeleteCount++;
- // String msgId = path.substring(path.lastIndexOf('/') + 1);
- // if (msgs.containsKey(msgId))
- // {
- // MsgItem msgItem = msgs.get(msgId);
- // Message msg = msgItem.msg;
- // msgItem.deleteTime = Long.parseLong(timestamp);
- // msgs.put(msgId, msgItem);
- // msgItem.latency = msgItem.deleteTime - msgItem.sendTime;
- // System.out.println(timestamp + ": delMsg " + msg.getPartitionName() + "("
- // + msg.getFromState() + "->" + msg.getToState() + ") to "
- // + msg.getTgtName() + ", latency: " + msgItem.latency);
- // }
- // else
- // {
- // // messages other than STATE_TRANSITION message
- // // LOG.error("msg: " + msgId + " never sent");
- // }
- }
- }
- } // end of [br.readLine()) != null]
- }
- }
-}
[2/3] git commit: [HELIX-22] Remove dependency on josql, rb=16017
Posted by ka...@apache.org.
[HELIX-22] Remove dependency on josql, rb=16017
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/5019d96e
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/5019d96e
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/5019d96e
Branch: refs/heads/helix-0.6.2-release
Commit: 5019d96e233a905c5eb640f4a8b6d14af839e06d
Parents: efc1def
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Wed Dec 4 13:00:20 2013 -0800
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Thu Mar 13 18:55:19 2014 -0700
----------------------------------------------------------------------
helix-core/pom.xml | 6 -
.../main/java/org/apache/helix/Criteria.java | 12 +-
.../helix/josql/ClusterJosqlQueryProcessor.java | 278 -------------------
.../josql/ZNRecordJosqlFunctionHandler.java | 78 ------
.../org/apache/helix/josql/ZNRecordRow.java | 174 ------------
.../org/apache/helix/josql/package-info.java | 23 --
.../helix/messaging/CriteriaEvaluator.java | 156 +++++++----
.../org/apache/helix/messaging/ZNRecordRow.java | 193 +++++++++++++
.../josql/TestClusterJosqlQueryProcessor.java | 102 -------
.../apache/helix/josql/TestJosqlProcessor.java | 224 ---------------
.../messaging/TestDefaultMessagingService.java | 39 ++-
pom.xml | 5 -
12 files changed, 335 insertions(+), 955 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/5019d96e/helix-core/pom.xml
----------------------------------------------------------------------
diff --git a/helix-core/pom.xml b/helix-core/pom.xml
index cc6b86e..ee09bf9 100644
--- a/helix-core/pom.xml
+++ b/helix-core/pom.xml
@@ -45,7 +45,6 @@ under the License.
org.apache.zookeeper.txn*;resolution:=optional,
org.apache.zookeeper*;version="[3.3,4)",
org.codehaus.jackson*;version="[1.8,2)",
- org.josql*;version="[1.5,2)",
org.restlet;version="[2.1.4,3]",
*
</osgi.import>
@@ -116,11 +115,6 @@ under the License.
<version>0.1</version>
</dependency>
<dependency>
- <groupId>org.apache.camel</groupId>
- <artifactId>camel-josql</artifactId>
- <version>2.12.1</version>
- </dependency>
- <dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/helix/blob/5019d96e/helix-core/src/main/java/org/apache/helix/Criteria.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/Criteria.java b/helix-core/src/main/java/org/apache/helix/Criteria.java
index da2b36e..75781e1 100644
--- a/helix-core/src/main/java/org/apache/helix/Criteria.java
+++ b/helix-core/src/main/java/org/apache/helix/Criteria.java
@@ -39,16 +39,16 @@ public class Criteria {
*/
boolean sessionSpecific;
/**
- * applicable only in case PARTICIPANT use * to broadcast to all instances
+ * applicable only in case PARTICIPANT use % to broadcast to all instances
*/
String instanceName = "";
/**
- * Name of the resource. Use * to send message to all resources
+ * Name of the resource. Use % to send message to all resources
* owned by an instance.
*/
String resourceName = "";
/**
- * Resource partition. Use * to send message to all partitions of a given
+ * Resource partition. Use % to send message to all partitions of a given
* resource
*/
String partitionName = "";
@@ -140,7 +140,7 @@ public class Criteria {
/**
* Set the name of the destination instance (PARTICIPANT only)
- * @param instanceName the instance name or * for all instances
+ * @param instanceName the instance name or % for all instances
*/
public void setInstanceName(String instanceName) {
this.instanceName = instanceName;
@@ -156,7 +156,7 @@ public class Criteria {
/**
* Set the destination resource name
- * @param resourceName the resource name or * for all resources
+ * @param resourceName the resource name or % for all resources
*/
public void setResource(String resourceName) {
this.resourceName = resourceName;
@@ -172,7 +172,7 @@ public class Criteria {
/**
* Set the destination partition name
- * @param partitionName the partition name, or * for all partitions of a resource
+ * @param partitionName the partition name, or % for all partitions of a resource
*/
public void setPartition(String partitionName) {
this.partitionName = partitionName;
http://git-wip-us.apache.org/repos/asf/helix/blob/5019d96e/helix-core/src/main/java/org/apache/helix/josql/ClusterJosqlQueryProcessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/josql/ClusterJosqlQueryProcessor.java b/helix-core/src/main/java/org/apache/helix/josql/ClusterJosqlQueryProcessor.java
deleted file mode 100644
index 8f11de5..0000000
--- a/helix-core/src/main/java/org/apache/helix/josql/ClusterJosqlQueryProcessor.java
+++ /dev/null
@@ -1,278 +0,0 @@
-package org.apache.helix.josql;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixException;
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixProperty;
-import org.apache.helix.PropertyType;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
-import org.apache.helix.model.LiveInstance.LiveInstanceProperty;
-import org.apache.log4j.Logger;
-import org.josql.Query;
-import org.josql.QueryExecutionException;
-import org.josql.QueryParseException;
-import org.josql.QueryResults;
-
-public class ClusterJosqlQueryProcessor {
- public static final String PARTITIONS = "PARTITIONS";
- public static final String FLATTABLE = ".Table";
-
- HelixManager _manager;
- private static Logger _logger = Logger.getLogger(ClusterJosqlQueryProcessor.class);
-
- public ClusterJosqlQueryProcessor(HelixManager manager) {
- _manager = manager;
- }
-
- String parseFromTarget(String sql) {
- // We need to find out the "FROM" target, and replace it with liveInstances
- // / partitions etc
- int fromIndex = sql.indexOf("FROM");
- if (fromIndex == -1) {
- throw new HelixException("Query must contain FROM target. Query: " + sql);
- }
- // Per JoSql, select FROM <target> the target must be a object class that
- // corresponds to a "table row"
- // In out case, the row is always a ZNRecord
-
- int nextSpace = sql.indexOf(" ", fromIndex);
- while (sql.charAt(nextSpace) == ' ') {
- nextSpace++;
- }
- int nextnextSpace = sql.indexOf(" ", nextSpace);
- if (nextnextSpace == -1) {
- nextnextSpace = sql.length();
- }
- String fromTarget = sql.substring(nextSpace, nextnextSpace).trim();
-
- if (fromTarget.length() == 0) {
- throw new HelixException("FROM target in the query cannot be empty. Query: " + sql);
- }
- return fromTarget;
- }
-
- public List<Object> runJoSqlQuery(String josql, Map<String, Object> bindVariables,
- List<Object> additionalFunctionHandlers, List queryTarget) throws QueryParseException,
- QueryExecutionException {
- Query josqlQuery = prepareQuery(bindVariables, additionalFunctionHandlers);
-
- josqlQuery.parse(josql);
- QueryResults qr = josqlQuery.execute(queryTarget);
-
- return qr.getResults();
- }
-
- Query prepareQuery(Map<String, Object> bindVariables, List<Object> additionalFunctionHandlers) {
- // DataAccessor accessor = _manager.getDataAccessor();
- HelixDataAccessor accessor = _manager.getHelixDataAccessor();
-
- // Get all the ZNRecords in the cluster and set them as bind variables
- Builder keyBuilder = accessor.keyBuilder();
- // List<ZNRecord> instanceConfigs = accessor.getChildValues(PropertyType.CONFIGS,
- // ConfigScopeProperty.PARTICIPANT.toString());
-
- List<ZNRecord> instanceConfigs =
- HelixProperty.convertToList(accessor.getChildValues(keyBuilder.instanceConfigs()));
-
- List<ZNRecord> liveInstances =
- HelixProperty.convertToList(accessor.getChildValues(keyBuilder.liveInstances()));
- List<ZNRecord> stateModelDefs =
- HelixProperty.convertToList(accessor.getChildValues(keyBuilder.stateModelDefs()));
-
- // Idealstates are stored in a map from resource name to idealState ZNRecord
- List<ZNRecord> idealStateList =
- HelixProperty.convertToList(accessor.getChildValues(keyBuilder.idealStates()));
-
- Map<String, ZNRecord> idealStatesMap = new HashMap<String, ZNRecord>();
- for (ZNRecord idealState : idealStateList) {
- idealStatesMap.put(idealState.getId(), idealState);
- }
- // Make up the partition list: for selecting partitions
- List<ZNRecord> partitions = new ArrayList<ZNRecord>();
- for (ZNRecord idealState : idealStateList) {
- for (String partitionName : idealState.getMapFields().keySet()) {
- partitions.add(new ZNRecord(partitionName));
- }
- }
-
- List<ZNRecord> externalViewList =
- HelixProperty.convertToList(accessor.getChildValues(keyBuilder.externalViews()));
- // ExternalViews are stored in a map from resource name to idealState
- // ZNRecord
- Map<String, ZNRecord> externalViewMap = new HashMap<String, ZNRecord>();
- for (ZNRecord externalView : externalViewList) {
- externalViewMap.put(externalView.getId(), externalView);
- }
- // Map from instance name to a map from resource to current state ZNRecord
- Map<String, Map<String, ZNRecord>> currentStatesMap =
- new HashMap<String, Map<String, ZNRecord>>();
- // Map from instance name to a list of combined flat ZNRecordRow
- Map<String, List<ZNRecordRow>> flatCurrentStateMap = new HashMap<String, List<ZNRecordRow>>();
-
- for (ZNRecord instance : liveInstances) {
- String host = instance.getId();
- String sessionId = instance.getSimpleField(LiveInstanceProperty.SESSION_ID.toString());
- Map<String, ZNRecord> currentStates = new HashMap<String, ZNRecord>();
- List<ZNRecord> instanceCurrentStateList = new ArrayList<ZNRecord>();
- for (ZNRecord idealState : idealStateList) {
- String resourceName = idealState.getId();
-
- HelixProperty property =
- accessor.getProperty(keyBuilder.currentState(host, sessionId, resourceName));
- ZNRecord currentState = null;
- if (property == null) {
- _logger.warn("Resource " + resourceName + " has null currentState");
- currentState = new ZNRecord(resourceName);
- } else {
- currentState = property.getRecord();
- }
- currentStates.put(resourceName, currentState);
- instanceCurrentStateList.add(currentState);
- }
- currentStatesMap.put(host, currentStates);
- flatCurrentStateMap.put(host, ZNRecordRow.flatten(instanceCurrentStateList));
- }
- Query josqlQuery = new Query();
-
- // Set the default bind variables
- josqlQuery.setVariable(
- PropertyType.CONFIGS.toString() + "/" + ConfigScopeProperty.PARTICIPANT.toString(),
- instanceConfigs);
- josqlQuery.setVariable(PropertyType.IDEALSTATES.toString(), idealStatesMap);
- josqlQuery.setVariable(PropertyType.LIVEINSTANCES.toString(), liveInstances);
- josqlQuery.setVariable(PropertyType.STATEMODELDEFS.toString(), stateModelDefs);
- josqlQuery.setVariable(PropertyType.EXTERNALVIEW.toString(), externalViewMap);
- josqlQuery.setVariable(PropertyType.CURRENTSTATES.toString(), currentStatesMap);
- josqlQuery.setVariable(PARTITIONS, partitions);
-
- // Flat version of ZNRecords
- josqlQuery.setVariable(
- PropertyType.CONFIGS.toString() + "/" + ConfigScopeProperty.PARTICIPANT.toString()
- + FLATTABLE, ZNRecordRow.flatten(instanceConfigs));
- josqlQuery.setVariable(PropertyType.IDEALSTATES.toString() + FLATTABLE,
- ZNRecordRow.flatten(idealStateList));
- josqlQuery.setVariable(PropertyType.LIVEINSTANCES.toString() + FLATTABLE,
- ZNRecordRow.flatten(liveInstances));
- josqlQuery.setVariable(PropertyType.STATEMODELDEFS.toString() + FLATTABLE,
- ZNRecordRow.flatten(stateModelDefs));
- josqlQuery.setVariable(PropertyType.EXTERNALVIEW.toString() + FLATTABLE,
- ZNRecordRow.flatten(externalViewList));
- josqlQuery.setVariable(PropertyType.CURRENTSTATES.toString() + FLATTABLE,
- flatCurrentStateMap.values());
- josqlQuery.setVariable(PARTITIONS + FLATTABLE, ZNRecordRow.flatten(partitions));
- // Set additional bind variables
- if (bindVariables != null) {
- for (String key : bindVariables.keySet()) {
- josqlQuery.setVariable(key, bindVariables.get(key));
- }
- }
-
- josqlQuery.addFunctionHandler(new ZNRecordJosqlFunctionHandler());
- josqlQuery.addFunctionHandler(new ZNRecordRow());
- josqlQuery.addFunctionHandler(new Integer(0));
- if (additionalFunctionHandlers != null) {
- for (Object functionHandler : additionalFunctionHandlers) {
- josqlQuery.addFunctionHandler(functionHandler);
- }
- }
- return josqlQuery;
- }
-
- public List<Object> runJoSqlQuery(String josql, Map<String, Object> bindVariables,
- List<Object> additionalFunctionHandlers) throws QueryParseException, QueryExecutionException {
- Query josqlQuery = prepareQuery(bindVariables, additionalFunctionHandlers);
-
- // Per JoSql, select FROM <target> the target must be a object class that
- // corresponds to a "table row",
- // while the table (list of Objects) are put in the query by
- // query.execute(List<Object>). In the input,
- // In out case, the row is always a ZNRecord. But in SQL, the from target is
- // a "table name".
-
- String fromTargetString = parseFromTarget(josql);
-
- List fromTargetList = null;
- Object fromTarget = null;
- if (fromTargetString.equalsIgnoreCase(PARTITIONS)) {
- fromTarget = josqlQuery.getVariable(PARTITIONS.toString());
- } else if (fromTargetString.equalsIgnoreCase(PropertyType.LIVEINSTANCES.toString())) {
- fromTarget = josqlQuery.getVariable(PropertyType.LIVEINSTANCES.toString());
- } else if (fromTargetString.equalsIgnoreCase(PropertyType.CONFIGS.toString() + "/"
- + ConfigScopeProperty.PARTICIPANT.toString())) {
- fromTarget =
- josqlQuery.getVariable(PropertyType.CONFIGS.toString() + "/"
- + ConfigScopeProperty.PARTICIPANT.toString());
- } else if (fromTargetString.equalsIgnoreCase(PropertyType.STATEMODELDEFS.toString())) {
- fromTarget = josqlQuery.getVariable(PropertyType.STATEMODELDEFS.toString());
- } else if (fromTargetString.equalsIgnoreCase(PropertyType.EXTERNALVIEW.toString())) {
- fromTarget = josqlQuery.getVariable(PropertyType.EXTERNALVIEW.toString());
- } else if (fromTargetString.equalsIgnoreCase(PropertyType.IDEALSTATES.toString())) {
- fromTarget = josqlQuery.getVariable(PropertyType.IDEALSTATES.toString());
- }
-
- else if (fromTargetString.equalsIgnoreCase(PARTITIONS + FLATTABLE)) {
- fromTarget = josqlQuery.getVariable(PARTITIONS.toString() + FLATTABLE);
- } else if (fromTargetString.equalsIgnoreCase(PropertyType.LIVEINSTANCES.toString() + FLATTABLE)) {
- fromTarget = josqlQuery.getVariable(PropertyType.LIVEINSTANCES.toString() + FLATTABLE);
- } else if (fromTargetString.equalsIgnoreCase(PropertyType.CONFIGS.toString() + "/"
- + ConfigScopeProperty.PARTICIPANT.toString() + FLATTABLE)) {
- fromTarget =
- josqlQuery.getVariable(PropertyType.CONFIGS.toString() + "/"
- + ConfigScopeProperty.PARTICIPANT.toString() + FLATTABLE);
- } else if (fromTargetString
- .equalsIgnoreCase(PropertyType.STATEMODELDEFS.toString() + FLATTABLE)) {
- fromTarget = josqlQuery.getVariable(PropertyType.STATEMODELDEFS.toString() + FLATTABLE);
- } else if (fromTargetString.equalsIgnoreCase(PropertyType.EXTERNALVIEW.toString() + FLATTABLE)) {
- fromTarget = josqlQuery.getVariable(PropertyType.EXTERNALVIEW.toString() + FLATTABLE);
- } else if (fromTargetString.equalsIgnoreCase(PropertyType.IDEALSTATES.toString() + FLATTABLE)) {
- fromTarget = josqlQuery.getVariable(PropertyType.IDEALSTATES.toString() + FLATTABLE);
- } else {
- throw new HelixException(
- "Unknown query target "
- + fromTargetString
- + ". Target should be PARTITIONS, LIVEINSTANCES, CONFIGS, STATEMODELDEFS, IDEALSTATES, EXTERNALVIEW, and corresponding flat Tables");
- }
-
- fromTargetList =
- fromTargetString.endsWith(FLATTABLE) ? ((List<ZNRecordRow>) fromTarget)
- : ((List<ZNRecord>) fromTarget);
-
- // Per JoSql, select FROM <target> the target must be a object class that
- // corresponds to a "table row"
- // In out case, the row is always a ZNRecord
- josql =
- josql.replaceFirst(
- fromTargetString,
- fromTargetString.endsWith(FLATTABLE) ? ZNRecordRow.class.getName() : ZNRecord.class
- .getName());
- josqlQuery.parse(josql);
- QueryResults qr = josqlQuery.execute(fromTargetList);
- return qr.getResults();
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/5019d96e/helix-core/src/main/java/org/apache/helix/josql/ZNRecordJosqlFunctionHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/josql/ZNRecordJosqlFunctionHandler.java b/helix-core/src/main/java/org/apache/helix/josql/ZNRecordJosqlFunctionHandler.java
deleted file mode 100644
index f2fbddb..0000000
--- a/helix-core/src/main/java/org/apache/helix/josql/ZNRecordJosqlFunctionHandler.java
+++ /dev/null
@@ -1,78 +0,0 @@
-package org.apache.helix.josql;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Map;
-
-import org.apache.helix.ZNRecord;
-import org.josql.functions.AbstractFunctionHandler;
-
-public class ZNRecordJosqlFunctionHandler extends AbstractFunctionHandler {
- public boolean hasSimpleField(ZNRecord record, String fieldName, String field) {
- if (!record.getSimpleFields().containsKey(fieldName)) {
- return false;
- }
- return field.equals(record.getSimpleField(fieldName));
- }
-
- public boolean hasListField(ZNRecord record, String fieldName, String field) {
- if (!record.getListFields().containsKey(fieldName)) {
- return false;
- }
- return record.getListField(fieldName).contains(field);
- }
-
- public boolean hasMapFieldValue(ZNRecord record, String fieldName, String mapKey, String mapValue) {
- if (!record.getMapFields().containsKey(fieldName)) {
- return false;
- }
- if (record.getMapField(fieldName).containsKey(mapKey)) {
- return record.getMapField(fieldName).get(mapKey).equals(mapValue);
- }
- return false;
- }
-
- public boolean hasMapFieldKey(ZNRecord record, String fieldName, String mapKey) {
- if (!record.getMapFields().containsKey(fieldName)) {
- return false;
- }
- return record.getMapField(fieldName).containsKey(mapKey);
- }
-
- public String getMapFieldValue(ZNRecord record, String fieldName, String mapKey) {
- if (record.getMapFields().containsKey(fieldName)) {
- return record.getMapField(fieldName).get(mapKey);
- }
- return null;
- }
-
- public String getSimpleFieldValue(ZNRecord record, String key) {
- return record.getSimpleField(key);
- }
-
- public ZNRecord getZNRecordFromMap(Map<String, ZNRecord> recordMap, String key) {
- return recordMap.get(key);
- }
-
- public ZNRecord getZNRecordFromMap(Map<String, Map<String, ZNRecord>> recordMap, String key,
- String subKey) {
- return recordMap.get(key).get(subKey);
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/5019d96e/helix-core/src/main/java/org/apache/helix/josql/ZNRecordRow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/josql/ZNRecordRow.java b/helix-core/src/main/java/org/apache/helix/josql/ZNRecordRow.java
deleted file mode 100644
index 108596f..0000000
--- a/helix-core/src/main/java/org/apache/helix/josql/ZNRecordRow.java
+++ /dev/null
@@ -1,174 +0,0 @@
-package org.apache.helix.josql;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.helix.ZNRecord;
-
-/**
- * A Normalized form of ZNRecord
- */
-public class ZNRecordRow {
- // "Field names" in the flattened ZNRecord
- public static final String SIMPLE_KEY = "simpleKey";
- public static final String SIMPLE_VALUE = "simpleValue";
-
- public static final String LIST_KEY = "listKey";
- public static final String LIST_VALUE = "listValue";
- public static final String LIST_VALUE_INDEX = "listValueIndex";
-
- public static final String MAP_KEY = "mapKey";
- public static final String MAP_SUBKEY = "mapSubKey";
- public static final String MAP_VALUE = "mapValue";
- public static final String ZNRECORD_ID = "recordId";
- // ZNRECORD path ?
-
- final Map<String, String> _rowDataMap = new HashMap<String, String>();
-
- public ZNRecordRow() {
- _rowDataMap.put(SIMPLE_KEY, "");
- _rowDataMap.put(SIMPLE_VALUE, "");
- _rowDataMap.put(LIST_KEY, "");
- _rowDataMap.put(LIST_VALUE, "");
- _rowDataMap.put(LIST_VALUE_INDEX, "");
- _rowDataMap.put(MAP_KEY, "");
- _rowDataMap.put(MAP_SUBKEY, "");
- _rowDataMap.put(MAP_VALUE, "");
- _rowDataMap.put(ZNRECORD_ID, "");
- }
-
- public String getField(String rowField) {
- return _rowDataMap.get(rowField);
- }
-
- public void putField(String fieldName, String fieldValue) {
- _rowDataMap.put(fieldName, fieldValue);
- }
-
- public String getListValueIndex() {
- return getField(LIST_VALUE_INDEX);
- }
-
- public String getSimpleKey() {
- return getField(SIMPLE_KEY);
- }
-
- public String getSimpleValue() {
- return getField(SIMPLE_VALUE);
- }
-
- public String getListKey() {
- return getField(LIST_KEY);
- }
-
- public String getListValue() {
- return getField(LIST_VALUE);
- }
-
- public String getMapKey() {
- return getField(MAP_KEY);
- }
-
- public String getMapSubKey() {
- return getField(MAP_SUBKEY);
- }
-
- public String getMapValue() {
- return getField(MAP_VALUE);
- }
-
- public String getRecordId() {
- return getField(ZNRECORD_ID);
- }
-
- /* Josql function handlers */
- public static String getField(ZNRecordRow row, String rowField) {
- return row.getField(rowField);
- }
-
- public static List<ZNRecordRow> convertSimpleFields(ZNRecord record) {
- List<ZNRecordRow> result = new ArrayList<ZNRecordRow>();
- for (String key : record.getSimpleFields().keySet()) {
- ZNRecordRow row = new ZNRecordRow();
- row.putField(ZNRECORD_ID, record.getId());
- row.putField(SIMPLE_KEY, key);
- row.putField(SIMPLE_VALUE, record.getSimpleField(key));
- result.add(row);
- }
- return result;
- }
-
- public static List<ZNRecordRow> convertListFields(ZNRecord record) {
- List<ZNRecordRow> result = new ArrayList<ZNRecordRow>();
- for (String key : record.getListFields().keySet()) {
- int order = 0;
- for (String value : record.getListField(key)) {
- ZNRecordRow row = new ZNRecordRow();
- row.putField(ZNRECORD_ID, record.getId());
- row.putField(LIST_KEY, key);
- row.putField(LIST_VALUE, record.getSimpleField(key));
- row.putField(LIST_VALUE_INDEX, "" + order);
- order++;
- result.add(row);
- }
- }
- return result;
- }
-
- public static List<ZNRecordRow> convertMapFields(ZNRecord record) {
- List<ZNRecordRow> result = new ArrayList<ZNRecordRow>();
- for (String key0 : record.getMapFields().keySet()) {
- for (String key1 : record.getMapField(key0).keySet()) {
- ZNRecordRow row = new ZNRecordRow();
- row.putField(ZNRECORD_ID, record.getId());
- row.putField(MAP_KEY, key0);
- row.putField(MAP_SUBKEY, key1);
- row.putField(MAP_VALUE, record.getMapField(key0).get(key1));
- result.add(row);
- }
- }
- return result;
- }
-
- public static List<ZNRecordRow> flatten(ZNRecord record) {
- List<ZNRecordRow> result = convertMapFields(record);
- result.addAll(convertListFields(record));
- result.addAll(convertSimpleFields(record));
- return result;
- }
-
- public static List<ZNRecordRow> flatten(Collection<ZNRecord> recordList) {
- List<ZNRecordRow> result = new ArrayList<ZNRecordRow>();
- for (ZNRecord record : recordList) {
- result.addAll(flatten(record));
- }
- return result;
- }
-
- public static List<ZNRecordRow> getRowListFromMap(Map<String, List<ZNRecordRow>> rowMap,
- String key) {
- return rowMap.get(key);
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/5019d96e/helix-core/src/main/java/org/apache/helix/josql/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/josql/package-info.java b/helix-core/src/main/java/org/apache/helix/josql/package-info.java
deleted file mode 100644
index 550d569..0000000
--- a/helix-core/src/main/java/org/apache/helix/josql/package-info.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-/**
- * Jsql processor for Helix
- *
- */
-package org.apache.helix.josql;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/helix/blob/5019d96e/helix-core/src/main/java/org/apache/helix/messaging/CriteriaEvaluator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/CriteriaEvaluator.java b/helix-core/src/main/java/org/apache/helix/messaging/CriteriaEvaluator.java
index 3d2569e..9ca20af 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/CriteriaEvaluator.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/CriteriaEvaluator.java
@@ -19,80 +19,124 @@ package org.apache.helix.messaging;
* under the License.
*/
-import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
import org.apache.helix.Criteria;
+import org.apache.helix.Criteria.DataSource;
+import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
-import org.apache.helix.josql.ClusterJosqlQueryProcessor;
-import org.apache.helix.josql.ZNRecordRow;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.PropertyKey;
import org.apache.log4j.Logger;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
public class CriteriaEvaluator {
private static Logger logger = Logger.getLogger(CriteriaEvaluator.class);
+ /**
+ * Examine persisted data to match wildcards in {@link Criteria}
+ * @param recipientCriteria Criteria specifying the message destinations
+ * @param manager connection to the persisted data
+ * @return map of evaluated criteria
+ */
public List<Map<String, String>> evaluateCriteria(Criteria recipientCriteria, HelixManager manager) {
- List<Map<String, String>> selected = new ArrayList<Map<String, String>>();
-
- String queryFields =
- (!recipientCriteria.getInstanceName().equals("") ? " " + ZNRecordRow.MAP_SUBKEY : " ''")
- + ","
- + (!recipientCriteria.getResource().equals("") ? " " + ZNRecordRow.ZNRECORD_ID : " ''")
- + ","
- + (!recipientCriteria.getPartition().equals("") ? " " + ZNRecordRow.MAP_KEY : " ''")
- + ","
- + (!recipientCriteria.getPartitionState().equals("") ? " " + ZNRecordRow.MAP_VALUE
- : " '' ");
-
- String matchCondition =
- ZNRecordRow.MAP_SUBKEY
- + " LIKE '"
- + (!recipientCriteria.getInstanceName().equals("") ? (recipientCriteria
- .getInstanceName() + "'") : "%' ")
- + " AND "
- + ZNRecordRow.ZNRECORD_ID
- + " LIKE '"
- + (!recipientCriteria.getResource().equals("") ? (recipientCriteria.getResource() + "'")
- : "%' ")
- + " AND "
- + ZNRecordRow.MAP_KEY
- + " LIKE '"
- + (!recipientCriteria.getPartition().equals("") ? (recipientCriteria.getPartition() + "'")
- : "%' ")
- + " AND "
- + ZNRecordRow.MAP_VALUE
- + " LIKE '"
- + (!recipientCriteria.getPartitionState().equals("") ? (recipientCriteria
- .getPartitionState() + "'") : "%' ") + " AND " + ZNRecordRow.MAP_SUBKEY
- + " IN ((SELECT [*]id FROM :LIVEINSTANCES))";
+ // get the data
+ HelixDataAccessor accessor = manager.getHelixDataAccessor();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ Set<Map<String, String>> selected = Sets.newHashSet();
+ List<HelixProperty> properties;
+ if (recipientCriteria.getDataSource() == DataSource.EXTERNALVIEW) {
+ properties = accessor.getChildValues(keyBuilder.externalViews());
+ } else if (recipientCriteria.getDataSource() == DataSource.IDEALSTATES) {
+ properties = accessor.getChildValues(keyBuilder.idealStates());
+ } else {
+ return Collections.emptyList();
+ }
- String queryTarget =
- recipientCriteria.getDataSource().toString() + ClusterJosqlQueryProcessor.FLATTABLE;
+ // flatten the data
+ List<ZNRecordRow> allRows = ZNRecordRow.flatten(HelixProperty.convertToList(properties));
- String josql =
- "SELECT DISTINCT " + queryFields + " FROM " + queryTarget + " WHERE " + matchCondition;
- ClusterJosqlQueryProcessor p = new ClusterJosqlQueryProcessor(manager);
- List<Object> result = new ArrayList<Object>();
- try {
- logger.info("JOSQL query: " + josql);
- result = p.runJoSqlQuery(josql, null, null);
- } catch (Exception e) {
- logger.error("", e);
- return selected;
+ // save the matches
+ Set<String> liveParticipants = accessor.getChildValuesMap(keyBuilder.liveInstances()).keySet();
+ List<ZNRecordRow> result = Lists.newArrayList();
+ for (ZNRecordRow row : allRows) {
+ if (rowMatches(recipientCriteria, row) && liveParticipants.contains(row.getMapSubKey())) {
+ result.add(row);
+ }
}
- for (Object o : result) {
+ // deduplicate and convert the matches into the required format
+ for (ZNRecordRow row : result) {
Map<String, String> resultRow = new HashMap<String, String>();
- List<Object> row = (List<Object>) o;
- resultRow.put("instanceName", (String) (row.get(0)));
- resultRow.put("resourceName", (String) (row.get(1)));
- resultRow.put("partitionName", (String) (row.get(2)));
- resultRow.put("partitionState", (String) (row.get(3)));
+ resultRow.put("instanceName",
+ !recipientCriteria.getInstanceName().equals("") ? row.getMapSubKey() : "");
+ resultRow.put("resourceName", !recipientCriteria.getResource().equals("") ? row.getRecordId()
+ : "");
+ resultRow.put("partitionName", !recipientCriteria.getPartition().equals("") ? row.getMapKey()
+ : "");
+ resultRow.put("partitionState",
+ !recipientCriteria.getPartitionState().equals("") ? row.getMapValue() : "");
selected.add(resultRow);
}
- logger.info("JOSQL query return " + selected.size() + " rows");
- return selected;
+ logger.info("Query returned " + selected.size() + " rows");
+ return Lists.newArrayList(selected);
+ }
+
+ /**
+ * Check if a given row matches the specified criteria
+ * @param criteria the criteria
+ * @param row row of currently persisted data
+ * @return true if it matches, false otherwise
+ */
+ private boolean rowMatches(Criteria criteria, ZNRecordRow row) {
+ String instanceName = normalizePattern(criteria.getInstanceName());
+ String resourceName = normalizePattern(criteria.getResource());
+ String partitionName = normalizePattern(criteria.getPartition());
+ String partitionState = normalizePattern(criteria.getPartitionState());
+ return stringMatches(instanceName, row.getMapSubKey())
+ && stringMatches(resourceName, row.getRecordId())
+ && stringMatches(partitionName, row.getMapKey())
+ && stringMatches(partitionState, row.getMapValue());
+ }
+
+ /**
+ * Convert an SQL like expression into a Java matches expression
+ * @param pattern SQL like match pattern (i.e. contains '%'s and '_'s)
+ * @return Java matches expression (i.e. contains ".*?"s and '.'s)
+ */
+ private String normalizePattern(String pattern) {
+ if (pattern == null || pattern.equals("") || pattern.equals("*")) {
+ pattern = "%";
+ }
+ StringBuilder builder = new StringBuilder();
+ for (int i = 0; i < pattern.length(); i++) {
+ char ch = pattern.charAt(i);
+ if ("[](){}.*+?$^|#\\".indexOf(ch) != -1) {
+ // escape any reserved characters
+ builder.append("\\");
+ }
+ // append the character
+ builder.append(ch);
+ }
+ pattern = builder.toString().toLowerCase().replace("_", ".").replace("%", ".*?");
+ return pattern;
+ }
+
+ /**
+ * Check if a string matches a pattern
+ * @param pattern pattern allowed by Java regex matching
+ * @param value the string to check
+ * @return true if they match, false otherwise
+ */
+ private boolean stringMatches(String pattern, String value) {
+ Pattern p = Pattern.compile(pattern, Pattern.CASE_INSENSITIVE | Pattern.DOTALL);
+ return p.matcher(value).matches();
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/5019d96e/helix-core/src/main/java/org/apache/helix/messaging/ZNRecordRow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/ZNRecordRow.java b/helix-core/src/main/java/org/apache/helix/messaging/ZNRecordRow.java
new file mode 100644
index 0000000..5a2effd
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/messaging/ZNRecordRow.java
@@ -0,0 +1,193 @@
+package org.apache.helix.messaging;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.ZNRecord;
+
+/**
+ * A Normalized form of ZNRecord
+ */
+public class ZNRecordRow {
+ // "Field names" in the flattened ZNRecord
+ public static final String SIMPLE_KEY = "simpleKey";
+ public static final String SIMPLE_VALUE = "simpleValue";
+
+ public static final String LIST_KEY = "listKey";
+ public static final String LIST_VALUE = "listValue";
+ public static final String LIST_VALUE_INDEX = "listValueIndex";
+
+ public static final String MAP_KEY = "mapKey";
+ public static final String MAP_SUBKEY = "mapSubKey";
+ public static final String MAP_VALUE = "mapValue";
+ public static final String ZNRECORD_ID = "recordId";
+ // ZNRECORD path ?
+
+ final Map<String, String> _rowDataMap = new HashMap<String, String>();
+
+ public ZNRecordRow() {
+ _rowDataMap.put(SIMPLE_KEY, "");
+ _rowDataMap.put(SIMPLE_VALUE, "");
+ _rowDataMap.put(LIST_KEY, "");
+ _rowDataMap.put(LIST_VALUE, "");
+ _rowDataMap.put(LIST_VALUE_INDEX, "");
+ _rowDataMap.put(MAP_KEY, "");
+ _rowDataMap.put(MAP_SUBKEY, "");
+ _rowDataMap.put(MAP_VALUE, "");
+ _rowDataMap.put(ZNRECORD_ID, "");
+ }
+
+ public String getField(String rowField) {
+ return _rowDataMap.get(rowField);
+ }
+
+ public void putField(String fieldName, String fieldValue) {
+ _rowDataMap.put(fieldName, fieldValue);
+ }
+
+ public String getListValueIndex() {
+ return getField(LIST_VALUE_INDEX);
+ }
+
+ public String getSimpleKey() {
+ return getField(SIMPLE_KEY);
+ }
+
+ public String getSimpleValue() {
+ return getField(SIMPLE_VALUE);
+ }
+
+ public String getListKey() {
+ return getField(LIST_KEY);
+ }
+
+ public String getListValue() {
+ return getField(LIST_VALUE);
+ }
+
+ public String getMapKey() {
+ return getField(MAP_KEY);
+ }
+
+ public String getMapSubKey() {
+ return getField(MAP_SUBKEY);
+ }
+
+ public String getMapValue() {
+ return getField(MAP_VALUE);
+ }
+
+ public String getRecordId() {
+ return getField(ZNRECORD_ID);
+ }
+
+ /* Josql function handlers */
+ public static String getField(ZNRecordRow row, String rowField) {
+ return row.getField(rowField);
+ }
+
+ public static List<ZNRecordRow> convertSimpleFields(ZNRecord record) {
+ List<ZNRecordRow> result = new ArrayList<ZNRecordRow>();
+ for (String key : record.getSimpleFields().keySet()) {
+ ZNRecordRow row = new ZNRecordRow();
+ row.putField(ZNRECORD_ID, record.getId());
+ row.putField(SIMPLE_KEY, key);
+ row.putField(SIMPLE_VALUE, record.getSimpleField(key));
+ result.add(row);
+ }
+ return result;
+ }
+
+ public static List<ZNRecordRow> convertListFields(ZNRecord record) {
+ List<ZNRecordRow> result = new ArrayList<ZNRecordRow>();
+ for (String key : record.getListFields().keySet()) {
+ int order = 0;
+ for (String value : record.getListField(key)) {
+ ZNRecordRow row = new ZNRecordRow();
+ row.putField(ZNRECORD_ID, record.getId());
+ row.putField(LIST_KEY, key);
+ row.putField(LIST_VALUE, record.getSimpleField(key));
+ row.putField(LIST_VALUE_INDEX, "" + order);
+ order++;
+ result.add(row);
+ }
+ }
+ return result;
+ }
+
+ public static List<ZNRecordRow> convertMapFields(ZNRecord record) {
+ List<ZNRecordRow> result = new ArrayList<ZNRecordRow>();
+ for (String key0 : record.getMapFields().keySet()) {
+ for (String key1 : record.getMapField(key0).keySet()) {
+ ZNRecordRow row = new ZNRecordRow();
+ row.putField(ZNRECORD_ID, record.getId());
+ row.putField(MAP_KEY, key0);
+ row.putField(MAP_SUBKEY, key1);
+ row.putField(MAP_VALUE, record.getMapField(key0).get(key1));
+ result.add(row);
+ }
+ }
+ return result;
+ }
+
+ public static List<ZNRecordRow> flatten(ZNRecord record) {
+ List<ZNRecordRow> result = convertMapFields(record);
+ result.addAll(convertListFields(record));
+ result.addAll(convertSimpleFields(record));
+ return result;
+ }
+
+ public static List<ZNRecordRow> flatten(Collection<ZNRecord> recordList) {
+ List<ZNRecordRow> result = new ArrayList<ZNRecordRow>();
+ for (ZNRecord record : recordList) {
+ result.addAll(flatten(record));
+ }
+ return result;
+ }
+
+ public static List<ZNRecordRow> getRowListFromMap(Map<String, List<ZNRecordRow>> rowMap,
+ String key) {
+ return rowMap.get(key);
+ }
+
+ @Override
+ public String toString() {
+ return _rowDataMap.toString();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other instanceof ZNRecordRow) {
+ ZNRecordRow that = (ZNRecordRow) other;
+ return this._rowDataMap.equals(that._rowDataMap);
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return _rowDataMap.hashCode();
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/5019d96e/helix-core/src/test/java/org/apache/helix/josql/TestClusterJosqlQueryProcessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/josql/TestClusterJosqlQueryProcessor.java b/helix-core/src/test/java/org/apache/helix/josql/TestClusterJosqlQueryProcessor.java
deleted file mode 100644
index 8090201..0000000
--- a/helix-core/src/test/java/org/apache/helix/josql/TestClusterJosqlQueryProcessor.java
+++ /dev/null
@@ -1,102 +0,0 @@
-package org.apache.helix.josql;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-import org.apache.helix.Criteria;
-import org.apache.helix.InstanceType;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.josql.ZNRecordJosqlFunctionHandler;
-import org.apache.helix.josql.ZNRecordRow;
-import org.apache.helix.model.LiveInstance.LiveInstanceProperty;
-import org.apache.helix.tools.DefaultIdealStateCalculator;
-import org.josql.Query;
-import org.josql.QueryExecutionException;
-import org.josql.QueryParseException;
-import org.josql.QueryResults;
-import org.testng.annotations.Test;
-
-public class TestClusterJosqlQueryProcessor {
- @Test(groups = {
- "unitTest"
- })
- public void queryClusterDataSample() {
- List<ZNRecord> liveInstances = new ArrayList<ZNRecord>();
- Map<String, ZNRecord> liveInstanceMap = new HashMap<String, ZNRecord>();
- List<String> instances = new ArrayList<String>();
- for (int i = 0; i < 5; i++) {
- String instance = "localhost_" + (12918 + i);
- instances.add(instance);
- ZNRecord metaData = new ZNRecord(instance);
- metaData.setSimpleField(LiveInstanceProperty.SESSION_ID.toString(), UUID.randomUUID()
- .toString());
- metaData.setSimpleField("SCN", "" + (10 - i));
- liveInstances.add(metaData);
- liveInstanceMap.put(instance, metaData);
- }
-
- // liveInstances.remove(0);
- ZNRecord externalView =
- DefaultIdealStateCalculator.calculateIdealState(instances, 21, 3, "TestDB", "MASTER",
- "SLAVE");
-
- Criteria criteria = new Criteria();
- criteria.setInstanceName("%");
- criteria.setResource("TestDB");
- criteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
- criteria.setPartition("TestDB_2%");
- criteria.setPartitionState("SLAVE");
-
- String josql =
- " SELECT DISTINCT mapSubKey AS 'subkey', mapValue AS 'mapValue' , getSimpleFieldValue(getZNRecordFromMap(:LIVEINSTANCESMAP, mapSubKey), 'SCN') AS 'SCN'"
- + " FROM org.apache.helix.josql.ZNRecordRow "
- + " WHERE mapKey LIKE 'TestDB_2%' "
- + " AND mapSubKey LIKE '%' "
- + " AND mapValue LIKE 'SLAVE' "
- + " AND mapSubKey IN ((SELECT [*]id FROM :LIVEINSTANCES)) "
- + " ORDER BY parseInt(getSimpleFieldValue(getZNRecordFromMap(:LIVEINSTANCESMAP, mapSubKey), 'SCN'))";
-
- Query josqlQuery = new Query();
- josqlQuery.setVariable("LIVEINSTANCES", liveInstances);
- josqlQuery.setVariable("LIVEINSTANCESMAP", liveInstanceMap);
- josqlQuery.addFunctionHandler(new ZNRecordRow());
- josqlQuery.addFunctionHandler(new ZNRecordJosqlFunctionHandler());
- josqlQuery.addFunctionHandler(new Integer(0));
- try {
- josqlQuery.parse(josql);
- QueryResults qr = josqlQuery.execute(ZNRecordRow.convertMapFields(externalView));
- @SuppressWarnings({
- "unchecked", "unused"
- })
- List<Object> result = qr.getResults();
-
- } catch (QueryParseException e) {
- e.printStackTrace();
- } catch (QueryExecutionException e) {
- e.printStackTrace();
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/5019d96e/helix-core/src/test/java/org/apache/helix/josql/TestJosqlProcessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/josql/TestJosqlProcessor.java b/helix-core/src/test/java/org/apache/helix/josql/TestJosqlProcessor.java
deleted file mode 100644
index c70f984..0000000
--- a/helix-core/src/test/java/org/apache/helix/josql/TestJosqlProcessor.java
+++ /dev/null
@@ -1,224 +0,0 @@
-package org.apache.helix.josql;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-
-import org.apache.helix.HelixException;
-import org.apache.helix.HelixManager;
-import org.apache.helix.TestHelper;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.integration.ZkStandAloneCMTestBase;
-import org.apache.helix.josql.ClusterJosqlQueryProcessor;
-import org.apache.helix.model.LiveInstance.LiveInstanceProperty;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-public class TestJosqlProcessor extends ZkStandAloneCMTestBase {
- @Test(groups = {
- "integrationTest"
- })
- public void testJosqlQuery() throws Exception {
- HelixManager manager = _participants[0];
- // ((TestHelper.StartCMResult) (_startCMResultMap.values().toArray()[0]))._manager;
-
- // Find the instance name that contains partition TestDB_2 and state is 'MASTER'
- String SQL =
- "SELECT id "
- + "FROM LIVEINSTANCES "
- + "WHERE getMapFieldValue( getZNRecordFromMap(:IDEALSTATES , 'TestDB'), :partitionName, :_currObj.id)='MASTER'";
- Map<String, Object> bindVariables = new HashMap<String, Object>();
- bindVariables.put("partitionName", "TestDB_2");
-
- ClusterJosqlQueryProcessor p = new ClusterJosqlQueryProcessor(manager);
- List<Object> result = p.runJoSqlQuery(SQL, bindVariables, null);
-
- Assert.assertEquals(result.size(), 1);
- List<Object> firstList = (List<Object>) result.get(0);
- Assert.assertTrue(((String) (firstList.get(0))).equalsIgnoreCase("localhost_12921"));
-
- // Find the live instances names that hosts Partition TestDB_10 according to idealstate
-
- SQL =
- "SELECT id "
- + "FROM LIVEINSTANCES "
- + "WHERE hasMapFieldKey( getZNRecordFromMap(:IDEALSTATES, 'TestDB'), :partitionName, :_currObj.id)='true'";
- p = new ClusterJosqlQueryProcessor(manager);
- bindVariables.put("partitionName", "TestDB_10");
- result = p.runJoSqlQuery(SQL, bindVariables, null);
-
- Assert.assertEquals(result.size(), 3);
- Set<String> hosts = new HashSet<String>();
- for (Object o : result) {
- String val = (String) ((List<Object>) o).get(0);
- hosts.add(val);
- }
- Assert.assertTrue(hosts.contains("localhost_12918"));
- Assert.assertTrue(hosts.contains("localhost_12920"));
- Assert.assertTrue(hosts.contains("localhost_12921"));
-
- // Find the partitions on host localhost_12919 and is on MASTER state
- SQL =
- "SELECT id "
- + "FROM PARTITIONS "
- + "WHERE getMapFieldValue( getZNRecordFromMap(:EXTERNALVIEW, 'TestDB'), id, :instanceName)='MASTER'";
- p = new ClusterJosqlQueryProcessor(manager);
- bindVariables.clear();
- bindVariables.put("instanceName", "localhost_12919");
- result = p.runJoSqlQuery(SQL, bindVariables, null);
-
- Assert.assertEquals(result.size(), 4);
- Set<String> partitions = new HashSet<String>();
- for (Object o : result) {
- String val = (String) ((List<Object>) o).get(0);
- partitions.add(val);
- }
- Assert.assertTrue(partitions.contains("TestDB_6"));
- Assert.assertTrue(partitions.contains("TestDB_7"));
- Assert.assertTrue(partitions.contains("TestDB_9"));
- Assert.assertTrue(partitions.contains("TestDB_14"));
-
- // Find the partitions on host localhost_12919 and is on MASTER state
- // Same as above but according to currentstates
- SQL =
- "SELECT id "
- + "FROM PARTITIONS "
- + "WHERE getMapFieldValue( getZNRecordFromMap(:CURRENTSTATES, :instanceName, 'TestDB'), :_currObj.id, :mapFieldKey)=:partitionState";
-
- p = new ClusterJosqlQueryProcessor(manager);
- bindVariables.clear();
- bindVariables.put("instanceName", "localhost_12919");
- bindVariables.put("mapFieldKey", "CURRENT_STATE");
- bindVariables.put("partitionState", "MASTER");
-
- result = p.runJoSqlQuery(SQL, bindVariables, null);
-
- Assert.assertEquals(result.size(), 4);
- partitions.clear();
- partitions = new HashSet<String>();
- for (Object o : result) {
- String val = (String) ((List<Object>) o).get(0);
- partitions.add(val);
- }
- Assert.assertTrue(partitions.contains("TestDB_6"));
- Assert.assertTrue(partitions.contains("TestDB_7"));
- Assert.assertTrue(partitions.contains("TestDB_9"));
- Assert.assertTrue(partitions.contains("TestDB_14"));
-
- // get node name that hosts a certain partition with certain state
-
- SQL =
- "SELECT id "
- + "FROM LIVEINSTANCES "
- + "WHERE getMapFieldValue( getZNRecordFromMap(:CURRENTSTATES, id, 'TestDB'), :partitionName, :mapFieldKey)=:partitionState";
-
- p = new ClusterJosqlQueryProcessor(manager);
- bindVariables.clear();
- bindVariables.put("partitionName", "TestDB_8");
- bindVariables.put("mapFieldKey", "CURRENT_STATE");
- bindVariables.put("partitionState", "SLAVE");
-
- result = p.runJoSqlQuery(SQL, bindVariables, null);
-
- Assert.assertEquals(result.size(), 2);
- partitions.clear();
- partitions = new HashSet<String>();
- for (Object o : result) {
- String val = (String) ((List<Object>) o).get(0);
- partitions.add(val);
- }
- Assert.assertTrue(partitions.contains("localhost_12918"));
- Assert.assertTrue(partitions.contains("localhost_12922"));
- }
-
- @Test(groups = {
- "unitTest"
- })
- public void parseFromTarget() {
- ClusterJosqlQueryProcessor p = new ClusterJosqlQueryProcessor(null);
- String sql = "SELECT id " + "FROM LIVEINSTANCES ";
- String from = p.parseFromTarget(sql);
- Assert.assertTrue(from.equals("LIVEINSTANCES"));
-
- sql = "SELECT id " + "FROM LIVEINSTANCES WHERE 1=2";
-
- from = p.parseFromTarget(sql);
- Assert.assertTrue(from.equals("LIVEINSTANCES"));
-
- sql = "SELECT id " + "FROM LIVEINSTANCES";
-
- from = p.parseFromTarget(sql);
- Assert.assertTrue(from.equals("LIVEINSTANCES"));
-
- sql = "SELECT id " + " LIVEINSTANCES where tt=00";
- boolean exceptionThrown = false;
- try {
- from = p.parseFromTarget(sql);
- } catch (HelixException e) {
- exceptionThrown = true;
- }
- Assert.assertTrue(exceptionThrown);
- }
-
- @Test(groups = ("unitTest"))
- public void testOrderby() throws Exception {
- HelixManager manager = _participants[0];
- // ((TestHelper.StartCMResult) (_startCMResultMap.values().toArray()[0]))._manager;
-
- Map<String, ZNRecord> scnMap = new HashMap<String, ZNRecord>();
- for (int i = 0; i < NODE_NR; i++) {
- String instance = "localhost_" + (12918 + i);
- ZNRecord metaData = new ZNRecord(instance);
- metaData.setSimpleField(LiveInstanceProperty.SESSION_ID.toString(), UUID.randomUUID()
- .toString());
- metaData.setMapField("SCN", new HashMap<String, String>());
- for (int j = 0; j < _PARTITIONS; j++) {
- metaData.getMapField("SCN").put(TEST_DB + "_" + j, "" + i);
- }
- scnMap.put(instance, metaData);
- }
- Map<String, Object> bindVariables = new HashMap<String, Object>();
- bindVariables.put("scnMap", scnMap);
- String SQL =
- " SELECT DISTINCT mapSubKey AS 'subkey', mapValue AS 'mapValue' , getMapFieldValue(getZNRecordFromMap(:scnMap, mapSubKey), 'SCN', mapKey) AS 'SCN'"
- + " FROM EXTERNALVIEW.Table "
- + " WHERE mapKey LIKE 'TestDB_1' "
- + " AND mapSubKey LIKE '%' "
- + " AND mapValue LIKE 'SLAVE' "
- + " AND mapSubKey IN ((SELECT [*]id FROM :LIVEINSTANCES)) "
- + " ORDER BY parseInt(getMapFieldValue(getZNRecordFromMap(:scnMap, mapSubKey), 'SCN', mapKey))";
-
- ClusterJosqlQueryProcessor p = new ClusterJosqlQueryProcessor(manager);
- List<Object> result = p.runJoSqlQuery(SQL, bindVariables, null);
- int prevSCN = -1;
- for (Object row : result) {
- List<String> stringRow = (List<String>) row;
- Assert.assertTrue(stringRow.get(1).equals("SLAVE"));
- int scn = Integer.parseInt(stringRow.get(2));
- Assert.assertTrue(scn > prevSCN);
- prevSCN = scn;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/5019d96e/helix-core/src/test/java/org/apache/helix/messaging/TestDefaultMessagingService.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/TestDefaultMessagingService.java b/helix-core/src/test/java/org/apache/helix/messaging/TestDefaultMessagingService.java
index 9686e16..95abd29 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/TestDefaultMessagingService.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/TestDefaultMessagingService.java
@@ -179,30 +179,63 @@ public class TestDefaultMessagingService {
recipientCriteria.setSelfExcluded(false);
AssertJUnit.assertEquals(1, svc.send(recipientCriteria, template));
+ // all instances, all partitions
recipientCriteria.setSelfExcluded(false);
recipientCriteria.setInstanceName("%");
recipientCriteria.setResource("DB");
recipientCriteria.setPartition("%");
AssertJUnit.assertEquals(200, svc.send(recipientCriteria, template));
- recipientCriteria.setSelfExcluded(true);
- recipientCriteria.setInstanceName("%");
+ // all instances, all partitions, use * instead of %
+ recipientCriteria.setSelfExcluded(false);
+ recipientCriteria.setInstanceName("*");
+ recipientCriteria.setResource("DB");
+ recipientCriteria.setPartition("*");
+ AssertJUnit.assertEquals(200, svc.send(recipientCriteria, template));
+
+ // tail pattern
+ recipientCriteria.setSelfExcluded(false);
+ recipientCriteria.setInstanceName("localhost%");
recipientCriteria.setResource("DB");
recipientCriteria.setPartition("%");
- AssertJUnit.assertEquals(159, svc.send(recipientCriteria, template));
+ AssertJUnit.assertEquals(200, svc.send(recipientCriteria, template));
+ // exclude this instance, send to all others for all partitions
recipientCriteria.setSelfExcluded(true);
recipientCriteria.setInstanceName("%");
recipientCriteria.setResource("DB");
recipientCriteria.setPartition("%");
AssertJUnit.assertEquals(159, svc.send(recipientCriteria, template));
+ // single instance, all partitions
recipientCriteria.setSelfExcluded(true);
recipientCriteria.setInstanceName("localhost_12920");
recipientCriteria.setResource("DB");
recipientCriteria.setPartition("%");
AssertJUnit.assertEquals(39, svc.send(recipientCriteria, template));
+ // single character wildcards
+ recipientCriteria.setSelfExcluded(true);
+ recipientCriteria.setInstanceName("l_calhost_12_20");
+ recipientCriteria.setResource("DB");
+ recipientCriteria.setPartition("%");
+ AssertJUnit.assertEquals(39, svc.send(recipientCriteria, template));
+
+ // head pattern
+ recipientCriteria.setSelfExcluded(true);
+ recipientCriteria.setInstanceName("%12920");
+ recipientCriteria.setResource("DB");
+ recipientCriteria.setPartition("%");
+ AssertJUnit.assertEquals(39, svc.send(recipientCriteria, template));
+
+ // middle pattern
+ recipientCriteria.setSelfExcluded(true);
+ recipientCriteria.setInstanceName("l%_12920");
+ recipientCriteria.setResource("DB");
+ recipientCriteria.setPartition("%");
+ AssertJUnit.assertEquals(39, svc.send(recipientCriteria, template));
+
+ // send to a controller
recipientCriteria.setSelfExcluded(true);
recipientCriteria.setInstanceName("localhost_12920");
recipientCriteria.setRecipientInstanceType(InstanceType.CONTROLLER);
http://git-wip-us.apache.org/repos/asf/helix/blob/5019d96e/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f3907f8..77eb495 100644
--- a/pom.xml
+++ b/pom.xml
@@ -168,11 +168,6 @@ under the License.
<name>SnakeYAML repository</name>
<url>http://oss.sonatype.org/content/groups/public/</url>
</repository>
- <repository>
- <id>jboss-fs-public</id>
- <name>JBoss FuseSource repository</name>
- <url>http://repository.jboss.org/nexus/content/groups/fs-public/</url>
- </repository>
</repositories>
[3/3] git commit: [HELIX-392] Write a test to ensure that ZK
connection loss is silent
Posted by ka...@apache.org.
[HELIX-392] Write a test to ensure that ZK connection loss is silent
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/6e587b2d
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/6e587b2d
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/6e587b2d
Branch: refs/heads/helix-0.6.2-release
Commit: 6e587b2d1252fed54b42f6daefeee12242f22b1c
Parents: 5019d96
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Wed Feb 26 17:55:08 2014 -0800
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Fri Mar 14 10:02:36 2014 -0700
----------------------------------------------------------------------
.../TestCorrectnessOnConnectivityLoss.java | 203 +++++++++++++++++++
1 file changed, 203 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/6e587b2d/helix-core/src/test/java/org/apache/helix/integration/TestCorrectnessOnConnectivityLoss.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCorrectnessOnConnectivityLoss.java b/helix-core/src/test/java/org/apache/helix/integration/TestCorrectnessOnConnectivityLoss.java
new file mode 100644
index 0000000..3b44f2c
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestCorrectnessOnConnectivityLoss.java
@@ -0,0 +1,203 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Map;
+
+import org.I0Itec.zkclient.ZkServer;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.TestHelper;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.StateTransitionError;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.helix.spectator.RoutingTableProvider;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Maps;
+
+public class TestCorrectnessOnConnectivityLoss {
+ private static final String ZK_ADDR = "localhost:2189";
+ private ZkServer _zkServer;
+ private String _clusterName;
+ private ClusterControllerManager _controller;
+
+ @BeforeMethod
+ public void beforeMethod() throws Exception {
+ _zkServer = TestHelper.startZkServer(ZK_ADDR);
+
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ _clusterName = className + "_" + methodName;
+ TestHelper.setupCluster(_clusterName, ZK_ADDR, 12918, // participant start port
+ "localhost", // participant host
+ "resource", // resource name prefix
+ 1, // number of resources
+ 1, // number of partitions
+ 1, // number of participants
+ 1, // number of replicas
+ "OnlineOffline", // state model
+ RebalanceMode.FULL_AUTO, // automatic assignment
+ true); // rebalance
+
+ _controller = new ClusterControllerManager(ZK_ADDR, _clusterName, "controller0");
+ _controller.connect();
+ }
+
+ @Test
+ public void testParticipant() throws Exception {
+ Map<String, Integer> stateReachedCounts = Maps.newHashMap();
+ HelixManager participant =
+ HelixManagerFactory.getZKHelixManager(_clusterName, "localhost_12918",
+ InstanceType.PARTICIPANT, ZK_ADDR);
+ participant.getStateMachineEngine().registerStateModelFactory("OnlineOffline",
+ new MyStateModelFactory(stateReachedCounts));
+ participant.connect();
+
+ Thread.sleep(1000);
+
+ // Ensure that the external view coalesces
+ boolean result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ _clusterName));
+ Assert.assertTrue(result);
+
+ // Ensure that there was only one state transition
+ Assert.assertEquals(stateReachedCounts.size(), 1);
+ Assert.assertTrue(stateReachedCounts.containsKey("ONLINE"));
+ Assert.assertEquals(stateReachedCounts.get("ONLINE").intValue(), 1);
+
+ // Now let's stop the ZK server; this should do nothing
+ TestHelper.stopZkServer(_zkServer);
+ Thread.sleep(1000);
+
+ // Verify no change
+ Assert.assertEquals(stateReachedCounts.size(), 1);
+ Assert.assertTrue(stateReachedCounts.containsKey("ONLINE"));
+ Assert.assertEquals(stateReachedCounts.get("ONLINE").intValue(), 1);
+ }
+
+ @SuppressWarnings("deprecation")
+ @Test
+ public void testSpectator() throws Exception {
+ Map<String, Integer> stateReachedCounts = Maps.newHashMap();
+ HelixManager participant =
+ HelixManagerFactory.getZKHelixManager(_clusterName, "localhost_12918",
+ InstanceType.PARTICIPANT, ZK_ADDR);
+ participant.getStateMachineEngine().registerStateModelFactory("OnlineOffline",
+ new MyStateModelFactory(stateReachedCounts));
+ participant.connect();
+
+ RoutingTableProvider routingTableProvider = new RoutingTableProvider();
+ HelixManager spectator =
+ HelixManagerFactory.getZKHelixManager(_clusterName, "spectator", InstanceType.SPECTATOR,
+ ZK_ADDR);
+ spectator.connect();
+ spectator.addConfigChangeListener(routingTableProvider);
+ spectator.addExternalViewChangeListener(routingTableProvider);
+ Thread.sleep(1000);
+
+ // Now let's stop the ZK server; this should do nothing
+ TestHelper.stopZkServer(_zkServer);
+ Thread.sleep(1000);
+
+ // Verify routing table still works
+ Assert.assertEquals(routingTableProvider.getInstances("resource0", "ONLINE").size(), 1);
+ Assert.assertEquals(routingTableProvider.getInstances("resource0", "OFFLINE").size(), 0);
+ }
+
+ @AfterMethod
+ public void afterMethod() throws Exception {
+ TestHelper.stopZkServer(_zkServer);
+ }
+
+ @StateModelInfo(initialState = "OFFLINE", states = {
+ "MASTER", "SLAVE", "OFFLINE", "ERROR"
+ })
+ public static class MyStateModel extends StateModel {
+ private final Map<String, Integer> _counts;
+
+ public MyStateModel(Map<String, Integer> counts) {
+ _counts = counts;
+ }
+
+ @Transition(to = "ONLINE", from = "OFFLINE")
+ public void onBecomeOnlineFromOffline(Message message, NotificationContext context) {
+ incrementCount(message.getToState());
+ }
+
+ @Transition(to = "OFFLINE", from = "ONLINE")
+ public void onBecomeOfflineFromOnline(Message message, NotificationContext context) {
+ incrementCount(message.getToState());
+ }
+
+ @Transition(to = "DROPPED", from = "OFFLINE")
+ public void onBecomeDroppedFromOffline(Message message, NotificationContext context) {
+ incrementCount(message.getToState());
+ }
+
+ @Transition(to = "OFFLINE", from = "ERROR")
+ public void onBecomeOfflineFromError(Message message, NotificationContext context) {
+ incrementCount(message.getToState());
+ }
+
+ @Transition(to = "DROPPED", from = "ERROR")
+ public void onBecomeDroppedFromError(Message message, NotificationContext context) {
+ incrementCount(message.getToState());
+ }
+
+ @Override
+ public void rollbackOnError(Message message, NotificationContext context,
+ StateTransitionError error) {
+ incrementCount("rollback");
+ }
+
+ private synchronized void incrementCount(String toState) {
+ int current = (_counts.containsKey(toState)) ? _counts.get(toState) : 0;
+ _counts.put(toState, current + 1);
+ }
+ }
+
+ public static class MyStateModelFactory extends StateModelFactory<MyStateModel> {
+
+ private final Map<String, Integer> _counts;
+
+ public MyStateModelFactory(Map<String, Integer> counts) {
+ _counts = counts;
+ }
+
+ @Override
+ public MyStateModel createNewStateModel(String partitionId) {
+ return new MyStateModel(_counts);
+ }
+ }
+}