You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by im...@apache.org on 2016/04/07 16:59:52 UTC
[17/50] [abbrv] incubator-asterixdb git commit: Merge branch 'master'
into hyracks-merge2
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
index b102e0b,0000000..e23837b
mode 100644,000000..100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
@@@ -1,798 -1,0 +1,802 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.aql;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.common.utils.ServletUtil.Servlets;
+import org.apache.asterix.test.server.ITestServer;
+import org.apache.asterix.test.server.TestServerProvider;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.asterix.testframework.context.TestCaseContext.OutputFormat;
+import org.apache.asterix.testframework.context.TestFileContext;
+import org.apache.asterix.testframework.xml.TestCase.CompilationUnit;
+import org.apache.asterix.testframework.xml.TestGroup;
+import org.apache.commons.httpclient.DefaultHttpMethodRetryHandler;
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.HttpMethod;
+import org.apache.commons.httpclient.HttpMethodBase;
+import org.apache.commons.httpclient.HttpStatus;
+import org.apache.commons.httpclient.NameValuePair;
+import org.apache.commons.httpclient.methods.GetMethod;
+import org.apache.commons.httpclient.methods.PostMethod;
+import org.apache.commons.httpclient.methods.StringRequestEntity;
+import org.apache.commons.httpclient.params.HttpMethodParams;
+import org.apache.commons.io.IOUtils;
++import org.apache.commons.lang3.mutable.MutableInt;
+import org.json.JSONObject;
+
+public class TestExecutor {
+
+ /*
+ * Static variables
+ */
+ protected static final Logger LOGGER = Logger.getLogger(TestExecutor.class.getName());
+ // see
+ // https://stackoverflow.com/questions/417142/what-is-the-maximum-length-of-a-url-in-different-browsers/417184
+ private static final long MAX_URL_LENGTH = 2000l;
+ private static Method managixExecuteMethod = null;
+ private static final HashMap<Integer, ITestServer> runningTestServers = new HashMap<>();
+
+ /*
+ * Instance members
+ */
+ private String host;
+ private int port;
+ private ITestLibrarian librarian;
+
+ public TestExecutor() {
+ host = "127.0.0.1";
+ port = 19002;
+ }
+
+ public TestExecutor(String host, int port) {
+ this.host = host;
+ this.port = port;
+ }
+
+ public void setLibrarian(ITestLibrarian librarian) {
+ this.librarian = librarian;
+ }
+
+ /**
+ * Probably does not work well with symlinks.
+ */
+ public boolean deleteRec(File path) {
+ if (path.isDirectory()) {
+ for (File f : path.listFiles()) {
+ if (!deleteRec(f)) {
+ return false;
+ }
+ }
+ }
+ return path.delete();
+ }
+
+ public void runScriptAndCompareWithResult(File scriptFile, PrintWriter print, File expectedFile, File actualFile)
+ throws Exception {
+ System.err.println("Expected results file: " + expectedFile.toString());
+ BufferedReader readerExpected = new BufferedReader(
+ new InputStreamReader(new FileInputStream(expectedFile), "UTF-8"));
+ BufferedReader readerActual = new BufferedReader(
+ new InputStreamReader(new FileInputStream(actualFile), "UTF-8"));
+ String lineExpected, lineActual;
+ int num = 1;
+ try {
+ while ((lineExpected = readerExpected.readLine()) != null) {
+ lineActual = readerActual.readLine();
+ // Assert.assertEquals(lineExpected, lineActual);
+ if (lineActual == null) {
+ if (lineExpected.isEmpty()) {
+ continue;
+ }
+ throw new Exception(
+ "Result for " + scriptFile + " changed at line " + num + ":\n< " + lineExpected + "\n> ");
+ }
+
+ // Comparing result equality but ignore "Time"-prefixed fields. (for metadata tests.)
+ String[] lineSplitsExpected = lineExpected.split("Time");
+ String[] lineSplitsActual = lineActual.split("Time");
+ if (lineSplitsExpected.length != lineSplitsActual.length) {
+ throw new Exception("Result for " + scriptFile + " changed at line " + num + ":\n< " + lineExpected
+ + "\n> " + lineActual);
+ }
+ if (!equalStrings(lineSplitsExpected[0], lineSplitsActual[0])) {
+ throw new Exception("Result for " + scriptFile + " changed at line " + num + ":\n< " + lineExpected
+ + "\n> " + lineActual);
+ }
+
+ for (int i = 1; i < lineSplitsExpected.length; i++) {
+ String[] splitsByCommaExpected = lineSplitsExpected[i].split(",");
+ String[] splitsByCommaActual = lineSplitsActual[i].split(",");
+ if (splitsByCommaExpected.length != splitsByCommaActual.length) {
+ throw new Exception("Result for " + scriptFile + " changed at line " + num + ":\n< "
+ + lineExpected + "\n> " + lineActual);
+ }
+ for (int j = 1; j < splitsByCommaExpected.length; j++) {
+ if (splitsByCommaExpected[j].indexOf("DatasetId") >= 0) {
+ // Ignore the field "DatasetId", which is different for different runs.
+ // (for metadata tests)
+ continue;
+ }
+ if (!equalStrings(splitsByCommaExpected[j], splitsByCommaActual[j])) {
+ throw new Exception("Result for " + scriptFile + " changed at line " + num + ":\n< "
+ + lineExpected + "\n> " + lineActual);
+ }
+ }
+ }
+
+ ++num;
+ }
+ lineActual = readerActual.readLine();
+ if (lineActual != null) {
+ throw new Exception("Result for " + scriptFile + " changed at line " + num + ":\n< \n> " + lineActual);
+ }
+ } finally {
+ readerExpected.close();
+ readerActual.close();
+ }
+
+ }
+
+ private boolean equalStrings(String s1, String s2) {
+ String[] rowsOne = s1.split("\n");
+ String[] rowsTwo = s2.split("\n");
+
+ for (int i = 0; i < rowsOne.length; i++) {
+ String row1 = rowsOne[i];
+ String row2 = rowsTwo[i];
+
+ if (row1.equals(row2)) {
+ continue;
+ }
+
+ String[] fields1 = row1.split(" ");
+ String[] fields2 = row2.split(" ");
+
+ boolean bagEncountered = false;
+ Set<String> bagElements1 = new HashSet<String>();
+ Set<String> bagElements2 = new HashSet<String>();
+
+ for (int j = 0; j < fields1.length; j++) {
+ if (j >= fields2.length) {
+ return false;
+ } else if (fields1[j].equals(fields2[j])) {
+ bagEncountered = fields1[j].equals("{{");
+ if (fields1[j].startsWith("}}")) {
+ if (!bagElements1.equals(bagElements2)) {
+ return false;
+ }
+ bagEncountered = false;
+ bagElements1.clear();
+ bagElements2.clear();
+ }
+ continue;
+ } else if (fields1[j].indexOf('.') < 0) {
+ if (bagEncountered) {
+ bagElements1.add(fields1[j].replaceAll(",$", ""));
+ bagElements2.add(fields2[j].replaceAll(",$", ""));
+ continue;
+ }
+ return false;
+ } else {
+ // If the fields are floating-point numbers, test them
+ // for equality safely
+ fields1[j] = fields1[j].split(",")[0];
+ fields2[j] = fields2[j].split(",")[0];
+ try {
+ Double double1 = Double.parseDouble(fields1[j]);
+ Double double2 = Double.parseDouble(fields2[j]);
+ float float1 = (float) double1.doubleValue();
+ float float2 = (float) double2.doubleValue();
+
+ if (Math.abs(float1 - float2) == 0) {
+ continue;
+ } else {
+ return false;
+ }
+ } catch (NumberFormatException ignored) {
+ // Guess they weren't numbers - must simply not be equal
+ return false;
+ }
+ }
+ }
+ }
+ return true;
+ }
+
+ // For tests where you simply want the byte-for-byte output.
+ private static void writeOutputToFile(File actualFile, InputStream resultStream) throws Exception {
+ try (FileOutputStream out = new FileOutputStream(actualFile)) {
+ IOUtils.copy(resultStream, out);
+ }
+ }
+
+ private int executeHttpMethod(HttpMethod method) throws Exception {
+ HttpClient client = new HttpClient();
+ int statusCode;
+ try {
+ statusCode = client.executeMethod(method);
+ } catch (Exception e) {
+ GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, e.getMessage(), e);
+ e.printStackTrace();
+ throw e;
+ }
+ if (statusCode != HttpStatus.SC_OK) {
+ // QQQ For now, we are indeed assuming we get back JSON errors.
+ // In future this may be changed depending on the requested
+ // output format sent to the servlet.
+ String errorBody = method.getResponseBodyAsString();
+ JSONObject result = new JSONObject(errorBody);
+ String[] errors = { result.getJSONArray("error-code").getString(0), result.getString("summary"),
+ result.getString("stacktrace") };
+ GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, errors[2]);
+ String exceptionMsg = "HTTP operation failed: " + errors[0] + "\nSTATUS LINE: " + method.getStatusLine()
+ + "\nSUMMARY: " + errors[1] + "\nSTACKTRACE: " + errors[2];
+ throw new Exception(exceptionMsg);
+ }
+ return statusCode;
+ }
+
+ // Executes Query and returns results as JSONArray
+ public InputStream executeQuery(String str, OutputFormat fmt, String url, List<CompilationUnit.Parameter> params)
+ throws Exception {
+ HttpMethodBase method = null;
+ if (str.length() + url.length() < MAX_URL_LENGTH) {
+ // Use GET for small-ish queries
+ method = new GetMethod(url);
+ NameValuePair[] parameters = new NameValuePair[params.size() + 1];
+ parameters[0] = new NameValuePair("query", str);
+ int i = 1;
+ for (CompilationUnit.Parameter param : params) {
+ parameters[i++] = new NameValuePair(param.getName(), param.getValue());
+ }
+ method.setQueryString(parameters);
+ } else {
+ // Use POST for bigger ones to avoid 413 FULL_HEAD
+ // QQQ POST API doesn't allow encoding additional parameters
+ method = new PostMethod(url);
+ ((PostMethod) method).setRequestEntity(new StringRequestEntity(str));
+ }
+
+ // Set accepted output response type
+ method.setRequestHeader("Accept", fmt.mimeType());
+ // Provide custom retry handler is necessary
+ method.getParams().setParameter(HttpMethodParams.RETRY_HANDLER, new DefaultHttpMethodRetryHandler(3, false));
+ executeHttpMethod(method);
+ return method.getResponseBodyAsStream();
+ }
+
+ public InputStream executeClusterStateQuery(OutputFormat fmt, String url) throws Exception {
+ HttpMethodBase method = new GetMethod(url);
+
+ // Set accepted output response type
+ method.setRequestHeader("Accept", fmt.mimeType());
+ // Provide custom retry handler is necessary
+ method.getParams().setParameter(HttpMethodParams.RETRY_HANDLER, new DefaultHttpMethodRetryHandler(3, false));
+ executeHttpMethod(method);
+ return method.getResponseBodyAsStream();
+ }
+
+ // To execute Update statements
+ // Insert and Delete statements are executed here
+ public void executeUpdate(String str, String url) throws Exception {
+ // Create a method instance.
+ PostMethod method = new PostMethod(url);
+ method.setRequestEntity(new StringRequestEntity(str));
+
+ // Provide custom retry handler is necessary
+ method.getParams().setParameter(HttpMethodParams.RETRY_HANDLER, new DefaultHttpMethodRetryHandler(3, false));
+
+ // Execute the method.
+ executeHttpMethod(method);
+ }
+
+ // Executes AQL in either async or async-defer mode.
+ public InputStream executeAnyAQLAsync(String str, boolean defer, OutputFormat fmt, String url) throws Exception {
+ // Create a method instance.
+ PostMethod method = new PostMethod(url);
+ if (defer) {
+ method.setQueryString(new NameValuePair[] { new NameValuePair("mode", "asynchronous-deferred") });
+ } else {
+ method.setQueryString(new NameValuePair[] { new NameValuePair("mode", "asynchronous") });
+ }
+ method.setRequestEntity(new StringRequestEntity(str));
+ method.setRequestHeader("Accept", fmt.mimeType());
+
+ // Provide custom retry handler is necessary
+ method.getParams().setParameter(HttpMethodParams.RETRY_HANDLER, new DefaultHttpMethodRetryHandler(3, false));
+ executeHttpMethod(method);
+ InputStream resultStream = method.getResponseBodyAsStream();
+
+ String theHandle = IOUtils.toString(resultStream, "UTF-8");
+
+ // take the handle and parse it so results can be retrieved
+ InputStream handleResult = getHandleResult(theHandle, fmt);
+ return handleResult;
+ }
+
+ private InputStream getHandleResult(String handle, OutputFormat fmt) throws Exception {
+ final String url = "http://" + host + ":" + port + Servlets.QUERY_RESULT.getPath();
+
+ // Create a method instance.
+ GetMethod method = new GetMethod(url);
+ method.setQueryString(new NameValuePair[] { new NameValuePair("handle", handle) });
+ method.setRequestHeader("Accept", fmt.mimeType());
+
+ // Provide custom retry handler is necessary
+ method.getParams().setParameter(HttpMethodParams.RETRY_HANDLER, new DefaultHttpMethodRetryHandler(3, false));
+
+ executeHttpMethod(method);
+ return method.getResponseBodyAsStream();
+ }
+
+ // To execute DDL and Update statements
+ // create type statement
+ // create dataset statement
+ // create index statement
+ // create dataverse statement
+ // create function statement
+ public void executeDDL(String str, String url) throws Exception {
+ // Create a method instance.
+ PostMethod method = new PostMethod(url);
+ method.setRequestEntity(new StringRequestEntity(str));
+ // Provide custom retry handler is necessary
+ method.getParams().setParameter(HttpMethodParams.RETRY_HANDLER, new DefaultHttpMethodRetryHandler(3, false));
+
+ // Execute the method.
+ executeHttpMethod(method);
+ }
+
+ // Method that reads a DDL/Update/Query File
+ // and returns the contents as a string
+ // This string is later passed to REST API for execution.
+ public String readTestFile(File testFile) throws Exception {
+ BufferedReader reader = new BufferedReader(new FileReader(testFile));
+ String line = null;
+ StringBuilder stringBuilder = new StringBuilder();
+ String ls = System.getProperty("line.separator");
+ while ((line = reader.readLine()) != null) {
+ stringBuilder.append(line);
+ stringBuilder.append(ls);
+ }
+ reader.close();
+ return stringBuilder.toString();
+ }
+
+ public static void executeManagixCommand(String command) throws ClassNotFoundException, NoSuchMethodException,
+ SecurityException, IllegalAccessException, IllegalArgumentException, InvocationTargetException {
+ if (managixExecuteMethod == null) {
+ Class<?> clazz = Class.forName("org.apache.asterix.installer.test.AsterixInstallerIntegrationUtil");
+ managixExecuteMethod = clazz.getMethod("executeCommand", String.class);
+ }
+ managixExecuteMethod.invoke(null, command);
+ }
+
+ public static String executeScript(ProcessBuilder pb, String scriptPath) throws Exception {
+ pb.command(scriptPath);
+ Process p = pb.start();
+ p.waitFor();
+ return getProcessOutput(p);
+ }
+
+ private static String executeVagrantScript(ProcessBuilder pb, String node, String scriptName) throws Exception {
+ pb.command("vagrant", "ssh", node, "--", pb.environment().get("SCRIPT_HOME") + scriptName);
+ Process p = pb.start();
+ p.waitFor();
+ InputStream input = p.getInputStream();
+ return IOUtils.toString(input, StandardCharsets.UTF_8.name());
+ }
+
+ private static String executeVagrantManagix(ProcessBuilder pb, String command) throws Exception {
+ pb.command("vagrant", "ssh", "cc", "--", pb.environment().get("MANAGIX_HOME") + command);
+ Process p = pb.start();
+ p.waitFor();
+ InputStream input = p.getInputStream();
+ return IOUtils.toString(input, StandardCharsets.UTF_8.name());
+ }
+
+ private static String getScriptPath(String queryPath, String scriptBasePath, String scriptFileName) {
+ String targetWord = "queries" + File.separator;
+ int targetWordSize = targetWord.lastIndexOf(File.separator);
+ int beginIndex = queryPath.lastIndexOf(targetWord) + targetWordSize;
+ int endIndex = queryPath.lastIndexOf(File.separator);
+ String prefix = queryPath.substring(beginIndex, endIndex);
+ String scriptPath = scriptBasePath + prefix + File.separator + scriptFileName;
+ return scriptPath;
+ }
+
+ private static String getProcessOutput(Process p) throws Exception {
+ StringBuilder s = new StringBuilder();
+ BufferedInputStream bisIn = new BufferedInputStream(p.getInputStream());
+ StringWriter writerIn = new StringWriter();
+ IOUtils.copy(bisIn, writerIn, "UTF-8");
+ s.append(writerIn.toString());
+
+ BufferedInputStream bisErr = new BufferedInputStream(p.getErrorStream());
+ StringWriter writerErr = new StringWriter();
+ IOUtils.copy(bisErr, writerErr, "UTF-8");
+ s.append(writerErr.toString());
+ if (writerErr.toString().length() > 0) {
+ StringBuilder sbErr = new StringBuilder();
+ sbErr.append("script execution failed - error message:\n");
+ sbErr.append("-------------------------------------------\n");
+ sbErr.append(s.toString());
+ sbErr.append("-------------------------------------------\n");
+ LOGGER.info(sbErr.toString().trim());
+ throw new Exception(s.toString().trim());
+ }
+ return s.toString();
+ }
+
+ public void executeTest(String actualPath, TestCaseContext testCaseCtx, ProcessBuilder pb,
+ boolean isDmlRecoveryTest) throws Exception {
+ executeTest(actualPath, testCaseCtx, pb, isDmlRecoveryTest, null);
+ }
+
++ public void executeTest(TestCaseContext testCaseCtx, TestFileContext ctx, String statement,
++ boolean isDmlRecoveryTest, ProcessBuilder pb, CompilationUnit cUnit, MutableInt queryCount,
++ List<TestFileContext> expectedResultFileCtxs, File testFile, String actualPath) throws Exception {
++ File qbcFile;
++ boolean failed = false;
++ File expectedResultFile;
++ switch (ctx.getType()) {
++ case "ddl":
++ if (ctx.getFile().getName().endsWith("aql")) {
++ executeDDL(statement, "http://" + host + ":" + port + Servlets.AQL_DDL.getPath());
++ } else {
++ executeDDL(statement, "http://" + host + ":" + port + Servlets.SQLPP_DDL.getPath());
++ }
++ break;
++ case "update":
++ // isDmlRecoveryTest: set IP address
++ if (isDmlRecoveryTest && statement.contains("nc1://")) {
++ statement = statement.replaceAll("nc1://", "127.0.0.1://../../../../../../asterix-app/");
++ }
++ if (ctx.getFile().getName().endsWith("aql")) {
++ executeUpdate(statement, "http://" + host + ":" + port + Servlets.AQL_UPDATE.getPath());
++ } else {
++ executeUpdate(statement, "http://" + host + ":" + port + Servlets.SQLPP_UPDATE.getPath());
++ }
++ break;
++ case "query":
++ case "async":
++ case "asyncdefer":
++ // isDmlRecoveryTest: insert Crash and Recovery
++ if (isDmlRecoveryTest) {
++ executeScript(pb, pb.environment().get("SCRIPT_HOME") + File.separator + "dml_recovery"
++ + File.separator + "kill_cc_and_nc.sh");
++ executeScript(pb, pb.environment().get("SCRIPT_HOME") + File.separator + "dml_recovery"
++ + File.separator + "stop_and_start.sh");
++ }
++ InputStream resultStream = null;
++ OutputFormat fmt = OutputFormat.forCompilationUnit(cUnit);
++ if (ctx.getFile().getName().endsWith("aql")) {
++ if (ctx.getType().equalsIgnoreCase("query")) {
++ resultStream = executeQuery(statement, fmt,
++ "http://" + host + ":" + port + Servlets.AQL_QUERY.getPath(), cUnit.getParameter());
++ } else if (ctx.getType().equalsIgnoreCase("async")) {
++ resultStream = executeAnyAQLAsync(statement, false, fmt,
++ "http://" + host + ":" + port + Servlets.AQL.getPath());
++ } else if (ctx.getType().equalsIgnoreCase("asyncdefer")) {
++ resultStream = executeAnyAQLAsync(statement, true, fmt,
++ "http://" + host + ":" + port + Servlets.AQL.getPath());
++ }
++ } else {
++ if (ctx.getType().equalsIgnoreCase("query")) {
++ resultStream = executeQuery(statement, fmt,
++ "http://" + host + ":" + port + Servlets.SQLPP_QUERY.getPath(), cUnit.getParameter());
++ } else if (ctx.getType().equalsIgnoreCase("async")) {
++ resultStream = executeAnyAQLAsync(statement, false, fmt,
++ "http://" + host + ":" + port + Servlets.SQLPP.getPath());
++ } else if (ctx.getType().equalsIgnoreCase("asyncdefer")) {
++ resultStream = executeAnyAQLAsync(statement, true, fmt,
++ "http://" + host + ":" + port + Servlets.SQLPP.getPath());
++ }
++ }
++
++ if (queryCount.intValue() >= expectedResultFileCtxs.size()) {
++ throw new IllegalStateException("no result file for " + testFile.toString() + "; queryCount: "
++ + queryCount + ", filectxs.size: " + expectedResultFileCtxs.size());
++ }
++ expectedResultFile = expectedResultFileCtxs.get(queryCount.intValue()).getFile();
++
++ File actualResultFile = testCaseCtx.getActualResultFile(cUnit, new File(actualPath));
++ actualResultFile.getParentFile().mkdirs();
++ writeOutputToFile(actualResultFile, resultStream);
++
++ runScriptAndCompareWithResult(testFile, new PrintWriter(System.err), expectedResultFile,
++ actualResultFile);
++ queryCount.increment();
++ break;
++ case "mgx":
++ executeManagixCommand(statement);
++ break;
++ case "txnqbc": // qbc represents query before crash
++ resultStream = executeQuery(statement, OutputFormat.forCompilationUnit(cUnit),
++ "http://" + host + ":" + port + Servlets.AQL_QUERY.getPath(), cUnit.getParameter());
++ qbcFile = getTestCaseQueryBeforeCrashFile(actualPath, testCaseCtx, cUnit);
++ qbcFile.getParentFile().mkdirs();
++ writeOutputToFile(qbcFile, resultStream);
++ break;
++ case "txnqar": // qar represents query after recovery
++ resultStream = executeQuery(statement, OutputFormat.forCompilationUnit(cUnit),
++ "http://" + host + ":" + port + Servlets.AQL_QUERY.getPath(), cUnit.getParameter());
++ File qarFile = new File(actualPath + File.separator
++ + testCaseCtx.getTestCase().getFilePath().replace(File.separator, "_") + "_" + cUnit.getName()
++ + "_qar.adm");
++ qarFile.getParentFile().mkdirs();
++ writeOutputToFile(qarFile, resultStream);
++ qbcFile = getTestCaseQueryBeforeCrashFile(actualPath, testCaseCtx, cUnit);
++ runScriptAndCompareWithResult(testFile, new PrintWriter(System.err), qbcFile, qarFile);
++ break;
++ case "txneu": // eu represents erroneous update
++ try {
++ executeUpdate(statement, "http://" + host + ":" + port + Servlets.AQL_UPDATE.getPath());
++ } catch (Exception e) {
++ // An exception is expected.
++ failed = true;
++ e.printStackTrace();
++ }
++ if (!failed) {
++ throw new Exception("Test \"" + testFile + "\" FAILED!\n An exception" + "is expected.");
++ }
++ System.err.println("...but that was expected.");
++ break;
++ case "script":
++ try {
++ String output = executeScript(pb, getScriptPath(testFile.getAbsolutePath(),
++ pb.environment().get("SCRIPT_HOME"), statement.trim()));
++ if (output.contains("ERROR")) {
++ throw new Exception(output);
++ }
++ } catch (Exception e) {
++ throw new Exception("Test \"" + testFile + "\" FAILED!\n", e);
++ }
++ break;
++ case "sleep":
++ String[] lines = statement.split("\n");
++ Thread.sleep(Long.parseLong(lines[lines.length - 1].trim()));
++ break;
++ case "errddl": // a ddlquery that expects error
++ try {
++ executeDDL(statement, "http://" + host + ":" + port + Servlets.AQL_DDL.getPath());
++ } catch (Exception e) {
++ // expected error happens
++ failed = true;
++ e.printStackTrace();
++ }
++ if (!failed) {
++ throw new Exception("Test \"" + testFile + "\" FAILED!\n An exception is expected.");
++ }
++ System.err.println("...but that was expected.");
++ break;
++ case "vscript": // a script that will be executed on a vagrant virtual node
++ try {
++ String[] command = statement.trim().split(" ");
++ if (command.length != 2) {
++ throw new Exception("invalid vagrant script format");
++ }
++ String nodeId = command[0];
++ String scriptName = command[1];
++ String output = executeVagrantScript(pb, nodeId, scriptName);
++ if (output.contains("ERROR")) {
++ throw new Exception(output);
++ }
++ } catch (Exception e) {
++ throw new Exception("Test \"" + testFile + "\" FAILED!\n", e);
++ }
++ break;
++ case "vmgx": // a managix command that will be executed on vagrant cc node
++ try {
++ String output = executeVagrantManagix(pb, statement);
++ if (output.contains("ERROR")) {
++ throw new Exception(output);
++ }
++ } catch (Exception e) {
++ throw new Exception("Test \"" + testFile + "\" FAILED!\n", e);
++ }
++ break;
++ case "cstate": // cluster state query
++ try {
++ fmt = OutputFormat.forCompilationUnit(cUnit);
++ resultStream = executeClusterStateQuery(fmt,
++ "http://" + host + ":" + port + Servlets.CLUSTER_STATE.getPath());
++ expectedResultFile = expectedResultFileCtxs.get(queryCount.intValue()).getFile();
++ actualResultFile = testCaseCtx.getActualResultFile(cUnit, new File(actualPath));
++ actualResultFile.getParentFile().mkdirs();
++ writeOutputToFile(actualResultFile, resultStream);
++ runScriptAndCompareWithResult(testFile, new PrintWriter(System.err), expectedResultFile,
++ actualResultFile);
++ queryCount.increment();
++ } catch (Exception e) {
++ throw new Exception("Test \"" + testFile + "\" FAILED!\n", e);
++ }
++ break;
++ case "server": // (start <test server name> <port>
++ // [<arg1>][<arg2>][<arg3>]...|stop (<port>|all))
++ try {
++ lines = statement.trim().split("\n");
++ String[] command = lines[lines.length - 1].trim().split(" ");
++ if (command.length < 2) {
++ throw new Exception("invalid server command format. expected format ="
++ + " (start <test server name> <port> [<arg1>][<arg2>][<arg3>]"
++ + "...|stop (<port>|all))");
++ }
++ String action = command[0];
++ if (action.equals("start")) {
++ if (command.length < 3) {
++ throw new Exception("invalid server start command. expected format ="
++ + " (start <test server name> <port> [<arg1>][<arg2>][<arg3>]...");
++ }
++ String name = command[1];
++ Integer port = new Integer(command[2]);
++ if (runningTestServers.containsKey(port)) {
++ throw new Exception("server with port " + port + " is already running");
++ }
++ ITestServer server = TestServerProvider.createTestServer(name, port);
++ server.configure(Arrays.copyOfRange(command, 3, command.length));
++ server.start();
++ runningTestServers.put(port, server);
++ } else if (action.equals("stop")) {
++ String target = command[1];
++ if (target.equals("all")) {
++ for (ITestServer server : runningTestServers.values()) {
++ server.stop();
++ }
++ runningTestServers.clear();
++ } else {
++ Integer port = new Integer(command[1]);
++ ITestServer server = runningTestServers.get(port);
++ if (server == null) {
++ throw new Exception("no server is listening to port " + port);
++ }
++ server.stop();
++ runningTestServers.remove(port);
++ }
++ } else {
++ throw new Exception("unknown server action");
++ }
++ } catch (Exception e) {
++ throw new Exception("Test \"" + testFile + "\" FAILED!\n", e);
++ }
++ break;
++ case "lib": // expected format <dataverse-name> <library-name>
++ // <library-directory>
++ // TODO: make this case work well with entity names containing spaces by
++ // looking for \"
++ lines = statement.split("\n");
++ String lastLine = lines[lines.length - 1];
++ String[] command = lastLine.trim().split(" ");
++ if (command.length < 3) {
++ throw new Exception("invalid library format");
++ }
++ String dataverse = command[1];
++ String library = command[2];
++ switch (command[0]) {
++ case "install":
++ if (command.length != 4) {
++ throw new Exception("invalid library format");
++ }
++ String libPath = command[3];
++ librarian.install(dataverse, library, libPath);
++ break;
++ case "uninstall":
++ if (command.length != 3) {
++ throw new Exception("invalid library format");
++ }
++ librarian.uninstall(dataverse, library);
++ break;
++ default:
++ throw new Exception("invalid library format");
++ }
++ break;
++ default:
++ throw new IllegalArgumentException("No statements of type " + ctx.getType());
++ }
++ }
++
+ public void executeTest(String actualPath, TestCaseContext testCaseCtx, ProcessBuilder pb,
+ boolean isDmlRecoveryTest, TestGroup failedGroup) throws Exception {
-
+ File testFile;
- File expectedResultFile;
+ String statement;
+ List<TestFileContext> expectedResultFileCtxs;
+ List<TestFileContext> testFileCtxs;
- File qbcFile = null;
- File qarFile = null;
- int queryCount = 0;
++ MutableInt queryCount = new MutableInt(0);
+ int numOfErrors = 0;
+ int numOfFiles = 0;
-
+ List<CompilationUnit> cUnits = testCaseCtx.getTestCase().getCompilationUnit();
+ for (CompilationUnit cUnit : cUnits) {
+ LOGGER.info(
+ "Starting [TEST]: " + testCaseCtx.getTestCase().getFilePath() + "/" + cUnit.getName() + " ... ");
+ testFileCtxs = testCaseCtx.getTestFiles(cUnit);
+ expectedResultFileCtxs = testCaseCtx.getExpectedResultFiles(cUnit);
+ for (TestFileContext ctx : testFileCtxs) {
+ numOfFiles++;
+ testFile = ctx.getFile();
+ statement = readTestFile(testFile);
- boolean failed = false;
+ try {
- switch (ctx.getType()) {
- case "ddl":
- if (ctx.getFile().getName().endsWith("aql")) {
- executeDDL(statement, "http://" + host + ":" + port + Servlets.AQL_DDL.getPath());
- } else {
- executeDDL(statement, "http://" + host + ":" + port + Servlets.SQLPP_DDL.getPath());
- }
- break;
- case "update":
- // isDmlRecoveryTest: set IP address
- if (isDmlRecoveryTest && statement.contains("nc1://")) {
- statement = statement.replaceAll("nc1://",
- "127.0.0.1://../../../../../../asterix-app/");
- }
- if (ctx.getFile().getName().endsWith("aql")) {
- executeUpdate(statement, "http://" + host + ":" + port + Servlets.AQL_UPDATE.getPath());
- } else {
- executeUpdate(statement,
- "http://" + host + ":" + port + Servlets.SQLPP_UPDATE.getPath());
- }
- break;
- case "query":
- case "async":
- case "asyncdefer":
- // isDmlRecoveryTest: insert Crash and Recovery
- if (isDmlRecoveryTest) {
- executeScript(pb, pb.environment().get("SCRIPT_HOME") + File.separator + "dml_recovery"
- + File.separator + "kill_cc_and_nc.sh");
- executeScript(pb, pb.environment().get("SCRIPT_HOME") + File.separator + "dml_recovery"
- + File.separator + "stop_and_start.sh");
- }
- InputStream resultStream = null;
- OutputFormat fmt = OutputFormat.forCompilationUnit(cUnit);
- if (ctx.getFile().getName().endsWith("aql")) {
- if (ctx.getType().equalsIgnoreCase("query")) {
- resultStream = executeQuery(statement, fmt,
- "http://" + host + ":" + port + Servlets.AQL_QUERY.getPath(),
- cUnit.getParameter());
- } else if (ctx.getType().equalsIgnoreCase("async")) {
- resultStream = executeAnyAQLAsync(statement, false, fmt,
- "http://" + host + ":" + port + Servlets.AQL.getPath());
- } else if (ctx.getType().equalsIgnoreCase("asyncdefer")) {
- resultStream = executeAnyAQLAsync(statement, true, fmt,
- "http://" + host + ":" + port + Servlets.AQL.getPath());
- }
- } else {
- if (ctx.getType().equalsIgnoreCase("query")) {
- resultStream = executeQuery(statement, fmt,
- "http://" + host + ":" + port + Servlets.SQLPP_QUERY.getPath(),
- cUnit.getParameter());
- } else if (ctx.getType().equalsIgnoreCase("async")) {
- resultStream = executeAnyAQLAsync(statement, false, fmt,
- "http://" + host + ":" + port + Servlets.SQLPP.getPath());
- } else if (ctx.getType().equalsIgnoreCase("asyncdefer")) {
- resultStream = executeAnyAQLAsync(statement, true, fmt,
- "http://" + host + ":" + port + Servlets.SQLPP.getPath());
- }
- }
-
- if (queryCount >= expectedResultFileCtxs.size()) {
- throw new IllegalStateException(
- "no result file for " + testFile.toString() + "; queryCount: " + queryCount
- + ", filectxs.size: " + expectedResultFileCtxs.size());
- }
- expectedResultFile = expectedResultFileCtxs.get(queryCount).getFile();
-
- File actualResultFile = testCaseCtx.getActualResultFile(cUnit, new File(actualPath));
- actualResultFile.getParentFile().mkdirs();
- writeOutputToFile(actualResultFile, resultStream);
-
- runScriptAndCompareWithResult(testFile, new PrintWriter(System.err), expectedResultFile,
- actualResultFile);
- queryCount++;
- break;
- case "mgx":
- executeManagixCommand(statement);
- break;
- case "txnqbc": // qbc represents query before crash
- resultStream = executeQuery(statement, OutputFormat.forCompilationUnit(cUnit),
- "http://" + host + ":" + port + Servlets.AQL_QUERY.getPath(), cUnit.getParameter());
- qbcFile = new File(actualPath + File.separator
- + testCaseCtx.getTestCase().getFilePath().replace(File.separator, "_") + "_"
- + cUnit.getName() + "_qbc.adm");
- qbcFile.getParentFile().mkdirs();
- writeOutputToFile(qbcFile, resultStream);
- break;
- case "txnqar": // qar represents query after recovery
- resultStream = executeQuery(statement, OutputFormat.forCompilationUnit(cUnit),
- "http://" + host + ":" + port + Servlets.AQL_QUERY.getPath(), cUnit.getParameter());
- qarFile = new File(actualPath + File.separator
- + testCaseCtx.getTestCase().getFilePath().replace(File.separator, "_") + "_"
- + cUnit.getName() + "_qar.adm");
- qarFile.getParentFile().mkdirs();
- writeOutputToFile(qarFile, resultStream);
- runScriptAndCompareWithResult(testFile, new PrintWriter(System.err), qbcFile, qarFile);
- break;
- case "txneu": // eu represents erroneous update
- try {
- executeUpdate(statement, "http://" + host + ":" + port + Servlets.AQL_UPDATE.getPath());
- } catch (Exception e) {
- // An exception is expected.
- failed = true;
- e.printStackTrace();
- }
- if (!failed) {
- throw new Exception(
- "Test \"" + testFile + "\" FAILED!\n An exception" + "is expected.");
- }
- System.err.println("...but that was expected.");
- break;
- case "script":
- try {
- String output = executeScript(pb, getScriptPath(testFile.getAbsolutePath(),
- pb.environment().get("SCRIPT_HOME"), statement.trim()));
- if (output.contains("ERROR")) {
- throw new Exception(output);
- }
- } catch (Exception e) {
- throw new Exception("Test \"" + testFile + "\" FAILED!\n", e);
- }
- break;
- case "sleep":
- String[] lines = statement.split("\n");
- Thread.sleep(Long.parseLong(lines[lines.length - 1].trim()));
- break;
- case "errddl": // a ddlquery that expects error
- try {
- executeDDL(statement, "http://" + host + ":" + port + Servlets.AQL_DDL.getPath());
- } catch (Exception e) {
- // expected error happens
- failed = true;
- e.printStackTrace();
- }
- if (!failed) {
- throw new Exception("Test \"" + testFile + "\" FAILED!\n An exception is expected.");
- }
- System.err.println("...but that was expected.");
- break;
- case "vscript": // a script that will be executed on a vagrant virtual node
- try {
- String[] command = statement.trim().split(" ");
- if (command.length != 2) {
- throw new Exception("invalid vagrant script format");
- }
- String nodeId = command[0];
- String scriptName = command[1];
- String output = executeVagrantScript(pb, nodeId, scriptName);
- if (output.contains("ERROR")) {
- throw new Exception(output);
- }
- } catch (Exception e) {
- throw new Exception("Test \"" + testFile + "\" FAILED!\n", e);
- }
- break;
- case "vmgx": // a managix command that will be executed on vagrant cc node
- try {
- String output = executeVagrantManagix(pb, statement);
- if (output.contains("ERROR")) {
- throw new Exception(output);
- }
- } catch (Exception e) {
- throw new Exception("Test \"" + testFile + "\" FAILED!\n", e);
- }
- break;
- case "cstate": // cluster state query
- try {
- fmt = OutputFormat.forCompilationUnit(cUnit);
- resultStream = executeClusterStateQuery(fmt,
- "http://" + host + ":" + port + Servlets.CLUSTER_STATE.getPath());
- expectedResultFile = expectedResultFileCtxs.get(queryCount).getFile();
- actualResultFile = testCaseCtx.getActualResultFile(cUnit, new File(actualPath));
- actualResultFile.getParentFile().mkdirs();
- writeOutputToFile(actualResultFile, resultStream);
- runScriptAndCompareWithResult(testFile, new PrintWriter(System.err), expectedResultFile,
- actualResultFile);
- queryCount++;
- } catch (Exception e) {
- throw new Exception("Test \"" + testFile + "\" FAILED!\n", e);
- }
- break;
- case "server": // (start <test server name> <port>
- // [<arg1>][<arg2>][<arg3>]...|stop (<port>|all))
- try {
- lines = statement.trim().split("\n");
- String[] command = lines[lines.length - 1].trim().split(" ");
- if (command.length < 2) {
- throw new Exception("invalid server command format. expected format ="
- + " (start <test server name> <port> [<arg1>][<arg2>][<arg3>]"
- + "...|stop (<port>|all))");
- }
- String action = command[0];
- if (action.equals("start")) {
- if (command.length < 3) {
- throw new Exception("invalid server start command. expected format ="
- + " (start <test server name> <port> [<arg1>][<arg2>][<arg3>]...");
- }
- String name = command[1];
- Integer port = new Integer(command[2]);
- if (runningTestServers.containsKey(port)) {
- throw new Exception("server with port " + port + " is already running");
- }
- ITestServer server = TestServerProvider.createTestServer(name, port);
- server.configure(Arrays.copyOfRange(command, 3, command.length));
- server.start();
- runningTestServers.put(port, server);
- } else if (action.equals("stop")) {
- String target = command[1];
- if (target.equals("all")) {
- for (ITestServer server : runningTestServers.values()) {
- server.stop();
- }
- runningTestServers.clear();
- } else {
- Integer port = new Integer(command[1]);
- ITestServer server = runningTestServers.get(port);
- if (server == null) {
- throw new Exception("no server is listening to port " + port);
- }
- server.stop();
- runningTestServers.remove(port);
- }
- } else {
- throw new Exception("unknown server action");
- }
- } catch (Exception e) {
- throw new Exception("Test \"" + testFile + "\" FAILED!\n", e);
- }
- break;
- case "lib": // expected format <dataverse-name> <library-name>
- // <library-directory>
- // TODO: make this case work well with entity names containing spaces by
- // looking for \"
- lines = statement.split("\n");
- String lastLine = lines[lines.length - 1];
- String[] command = lastLine.trim().split(" ");
- if (command.length < 3) {
- throw new Exception("invalid library format");
- }
- String dataverse = command[1];
- String library = command[2];
- switch (command[0]) {
- case "install":
- if (command.length != 4) {
- throw new Exception("invalid library format");
- }
- String libPath = command[3];
- librarian.install(dataverse, library, libPath);
- break;
- case "uninstall":
- if (command.length != 3) {
- throw new Exception("invalid library format");
- }
- librarian.uninstall(dataverse, library);
- break;
- default:
- throw new Exception("invalid library format");
- }
- break;
- default:
- throw new IllegalArgumentException("No statements of type " + ctx.getType());
- }
-
++ executeTest(testCaseCtx, ctx, statement, isDmlRecoveryTest, pb, cUnit, queryCount,
++ expectedResultFileCtxs, testFile, actualPath);
+ } catch (Exception e) {
+ System.err.println("testFile " + testFile.toString() + " raised an exception:");
+ boolean unExpectedFailure = false;
+ numOfErrors++;
+ if (cUnit.getExpectedError().size() < numOfErrors) {
+ unExpectedFailure = true;
+ } else {
+ // Get the expected exception
+ String expectedError = cUnit.getExpectedError().get(numOfErrors - 1);
+ if (e.toString().contains(expectedError)) {
+ System.err.println("...but that was expected.");
+ } else {
+ unExpectedFailure = true;
+ }
+ }
+ if (unExpectedFailure) {
+ e.printStackTrace();
+ System.err.println("...Unexpected!");
+ if (failedGroup != null) {
+ failedGroup.getTestCase().add(testCaseCtx.getTestCase());
+ }
+ throw new Exception("Test \"" + testFile + "\" FAILED!", e);
+ }
+ } finally {
+ if (numOfFiles == testFileCtxs.size() && numOfErrors < cUnit.getExpectedError().size()) {
+ System.err.println("...Unexpected!");
+ Exception e = new Exception(
+ "Test \"" + cUnit.getName() + "\" FAILED!\nExpected error was not thrown...");
+ e.printStackTrace();
+ throw e;
+ } else if (numOfFiles == testFileCtxs.size()) {
+ LOGGER.info("[TEST]: " + testCaseCtx.getTestCase().getFilePath() + "/" + cUnit.getName()
+ + " PASSED ");
+ }
+ }
+ }
+ }
+ }
++
++ private static File getTestCaseQueryBeforeCrashFile(String actualPath, TestCaseContext testCaseCtx,
++ CompilationUnit cUnit) {
++ return new File(
++ actualPath + File.separator + testCaseCtx.getTestCase().getFilePath().replace(File.separator, "_") + "_"
++ + cUnit.getName() + "_qbc.adm");
++ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-external-data/pom.xml
----------------------------------------------------------------------
diff --cc asterixdb/asterix-external-data/pom.xml
index 9a8540f,0000000..8c59cc4
mode 100644,000000..100644
--- a/asterixdb/asterix-external-data/pom.xml
+++ b/asterixdb/asterix-external-data/pom.xml
@@@ -1,298 -1,0 +1,298 @@@
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements. See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership. The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License. You may obtain a copy of the License at
+ !
+ ! http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied. See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>apache-asterixdb</artifactId>
+ <groupId>org.apache.asterix</groupId>
+ <version>0.8.9-SNAPSHOT</version>
+ </parent>
+ <licenses>
+ <license>
+ <name>Apache License, Version 2.0</name>
+ <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+ <distribution>repo</distribution>
+ <comments>A business-friendly OSS license</comments>
+ </license>
+ </licenses>
+ <artifactId>asterix-external-data</artifactId>
+ <properties>
+ <appendedResourcesDirectory>${basedir}/../src/main/appended-resources</appendedResourcesDirectory>
+ </properties>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.asterix</groupId>
+ <artifactId>lexer-generator-maven-plugin</artifactId>
+ <version>0.8.9-SNAPSHOT</version>
+ <configuration>
+ <grammarFile>src/main/resources/adm.grammar</grammarFile>
+ <outputDir>${project.build.directory}/generated-sources/org/apache/asterix/runtime/operators/file/adm</outputDir>
+ </configuration>
+ <executions>
+ <execution>
+ <id>generate-lexer</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>generate-lexer</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>1.9</version>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>${project.build.directory}/generated-sources/</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.jvnet.jaxb2.maven2</groupId>
+ <artifactId>maven-jaxb2-plugin</artifactId>
+ <version>0.9.0</version>
+ <executions>
+ <execution>
+ <id>configuration</id>
+ <goals>
+ <goal>generate</goal>
+ </goals>
+ <configuration>
+ <schemaDirectory>src/main/resources/schema</schemaDirectory>
+ <schemaIncludes>
+ <include>library.xsd</include>
+ </schemaIncludes>
+ <generatePackage>org.apache.asterix.external.library</generatePackage>
+ <generateDirectory>${project.build.directory}/generated-sources/configuration</generateDirectory>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>2.4</version>
+ <configuration>
+ <includes>
+ <include>**/*.class</include>
+ <include>**/*.txt</include>
+ <include>**/DISCLAIMER</include>
+ <include>**/NOTICE</include>
+ <include>**/LICENSE</include>
+ <include>**/DEPENDENCIES</include>
+ </includes>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ <phase>package</phase>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>2.2-beta-5</version>
+ <executions>
+ <execution>
+ <configuration>
+ <descriptor>src/main/assembly/binary-assembly-libzip.xml</descriptor>
+ <finalName>testlib-zip</finalName>
+ </configuration>
+ <phase>package</phase>
+ <goals>
+ <goal>attached</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ <pluginManagement>
+ <plugins>
+ <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
+ <plugin>
+ <groupId>org.eclipse.m2e</groupId>
+ <artifactId>lifecycle-mapping</artifactId>
+ <version>1.0.0</version>
+ <configuration>
+ <lifecycleMappingMetadata>
+ <pluginExecutions>
+ <pluginExecution>
+ <pluginExecutionFilter>
+ <groupId> org.apache.asterix</groupId>
+ <artifactId> lexer-generator-maven-plugin</artifactId>
+ <versionRange>[0.1,)</versionRange>
+ <goals>
+ <goal>generate-lexer</goal>
+ </goals>
+ </pluginExecutionFilter>
+ <action>
+ <execute>
+ <runOnIncremental>false</runOnIncremental>
+ </execute>
+ </action>
+ </pluginExecution>
+ <pluginExecution>
+ <pluginExecutionFilter>
+ <groupId> org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <versionRange>[1.7,)</versionRange>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ </pluginExecutionFilter>
+ <action>
+ <ignore />
+ </action>
+ </pluginExecution>
+ </pluginExecutions>
+ </lifecycleMappingMetadata>
+ </configuration>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+ </build>
+ <dependencies>
+ <dependency>
+ <groupId>javax.servlet</groupId>
+ <artifactId>servlet-api</artifactId>
+ <type>jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.asterix</groupId>
+ <artifactId>asterix-om</artifactId>
+ <version>0.8.9-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.asterix</groupId>
+ <artifactId>asterix-runtime</artifactId>
+ <version>0.8.9-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>algebricks-compiler</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.kenai.nbpwr</groupId>
+ <artifactId>org-apache-commons-io</artifactId>
+ <version>1.3.1-201002241208</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.twitter4j</groupId>
+ <artifactId>twitter4j-core</artifactId>
- <version>[4.0,)</version>
++ <version>4.0.3</version>
+ </dependency>
+ <dependency>
+ <groupId>org.twitter4j</groupId>
+ <artifactId>twitter4j-stream</artifactId>
- <version>[4.0,)</version>
++ <version>4.0.3</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>net.java.dev.rome</groupId>
+ <artifactId>rome-fetcher</artifactId>
+ <version>1.0.0</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>rome</artifactId>
+ <groupId>net.java.dev.rome</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>rome</groupId>
+ <artifactId>rome</artifactId>
+ <version>1.0.1-modified-01</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-hdfs-core</artifactId>
+ <version>${hyracks.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>jdom</groupId>
+ <artifactId>jdom</artifactId>
+ <version>1.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.asterix</groupId>
+ <artifactId>asterix-common</artifactId>
+ <version>0.8.9-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>com.microsoft.windowsazure</groupId>
+ <artifactId>microsoft-windowsazure-api</artifactId>
+ <version>0.4.4</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-exec</artifactId>
+ <version>0.13.0</version>
+ </dependency>
+ <dependency>
+ <groupId>javax.jdo</groupId>
+ <artifactId>jdo2-api</artifactId>
+ <version>2.3-20090302111651</version>
+ </dependency>
+ <dependency>
+ <groupId>com.e-movimento.tinytools</groupId>
+ <artifactId>privilegedaccessor</artifactId>
+ <version>1.2.2</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.couchbase.client</groupId>
+ <artifactId>core-io</artifactId>
+ <version>1.2.3</version>
+ </dependency>
+ <dependency>
+ <groupId>io.reactivex</groupId>
+ <artifactId>rxjava</artifactId>
+ <version>1.0.15</version>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
index a03ad1a,0000000..d3abd50
mode 100644,000000..100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
@@@ -1,162 -1,0 +1,165 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.adapter.factory;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IAdapterFactory;
+import org.apache.asterix.external.api.IDataFlowController;
+import org.apache.asterix.external.api.IDataParserFactory;
+import org.apache.asterix.external.api.IDataSourceAdapter;
+import org.apache.asterix.external.api.IExternalDataSourceFactory;
+import org.apache.asterix.external.api.IIndexibleExternalDataSource;
+import org.apache.asterix.external.api.IIndexingAdapterFactory;
+import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
+import org.apache.asterix.external.dataset.adapter.FeedAdapter;
+import org.apache.asterix.external.dataset.adapter.GenericAdapter;
+import org.apache.asterix.external.indexing.ExternalFile;
+import org.apache.asterix.external.provider.DataflowControllerProvider;
+import org.apache.asterix.external.provider.DatasourceFactoryProvider;
+import org.apache.asterix.external.provider.ParserFactoryProvider;
+import org.apache.asterix.external.util.ExternalDataCompatibilityUtils;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataUtils;
++import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.asterix.external.util.FeedUtils;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.std.file.FileSplit;
+
+public class GenericAdapterFactory implements IIndexingAdapterFactory, IAdapterFactory {
+
+ private static final long serialVersionUID = 1L;
+ private IExternalDataSourceFactory dataSourceFactory;
+ private IDataParserFactory dataParserFactory;
+ private ARecordType recordType;
+ private Map<String, String> configuration;
+ private List<ExternalFile> files;
+ private boolean indexingOp;
+ private boolean isFeed;
+ private FileSplit[] feedLogFileSplits;
+ private ARecordType metaType;
++ private FeedLogManager feedLogManager = null;
+
+ @Override
+ public void setSnapshot(List<ExternalFile> files, boolean indexingOp) {
+ this.files = files;
+ this.indexingOp = indexingOp;
+ }
+
+ @Override
+ public String getAlias() {
+ return ExternalDataConstants.ALIAS_GENERIC_ADAPTER;
+ }
+
+ @Override
+ public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AsterixException {
+ return dataSourceFactory.getPartitionConstraint();
+ }
+
+ /**
+ * Runs on each node controller (after serialization-deserialization)
+ */
+ @Override
+ public synchronized IDataSourceAdapter createAdapter(IHyracksTaskContext ctx, int partition)
+ throws HyracksDataException {
+ try {
+ restoreExternalObjects();
+ } catch (AsterixException e) {
+ throw new HyracksDataException(e);
+ }
++ if (isFeed) {
++ if (feedLogManager == null) {
++ feedLogManager = FeedUtils.getFeedLogManager(ctx, partition, feedLogFileSplits);
++ }
++ feedLogManager.touch();
++ }
+ IDataFlowController controller = DataflowControllerProvider.getDataflowController(recordType, ctx, partition,
- dataSourceFactory, dataParserFactory, configuration, indexingOp, isFeed, feedLogFileSplits);
++ dataSourceFactory, dataParserFactory, configuration, indexingOp, isFeed, feedLogManager);
+ if (isFeed) {
+ return new FeedAdapter((AbstractFeedDataFlowController) controller);
+ } else {
+ return new GenericAdapter(controller);
+ }
+ }
+
+ private void restoreExternalObjects() throws AsterixException {
+ if (dataSourceFactory == null) {
+ dataSourceFactory = DatasourceFactoryProvider.getExternalDataSourceFactory(configuration);
+ // create and configure parser factory
+ if (dataSourceFactory.isIndexible() && (files != null)) {
+ ((IIndexibleExternalDataSource) dataSourceFactory).setSnapshot(files, indexingOp);
+ }
+ dataSourceFactory.configure(configuration);
+ }
+ if (dataParserFactory == null) {
+ // create and configure parser factory
+ dataParserFactory = ParserFactoryProvider.getDataParserFactory(configuration);
+ dataParserFactory.setRecordType(recordType);
+ dataParserFactory.setMetaType(metaType);
+ dataParserFactory.configure(configuration);
+ }
+ }
+
+ @Override
+ public void configure(Map<String, String> configuration, ARecordType outputType, ARecordType metaType)
+ throws AsterixException {
+ this.recordType = outputType;
+ this.metaType = metaType;
+ this.configuration = configuration;
+ dataSourceFactory = DatasourceFactoryProvider.getExternalDataSourceFactory(configuration);
-
+ dataParserFactory = ParserFactoryProvider.getDataParserFactory(configuration);
- prepare();
++ if (dataSourceFactory.isIndexible() && (files != null)) {
++ ((IIndexibleExternalDataSource) dataSourceFactory).setSnapshot(files, indexingOp);
++ }
++ dataSourceFactory.configure(configuration);
++ dataParserFactory.setRecordType(recordType);
++ dataParserFactory.setMetaType(metaType);
++ dataParserFactory.configure(configuration);
+ ExternalDataCompatibilityUtils.validateCompatibility(dataSourceFactory, dataParserFactory);
+ configureFeedLogManager();
+ nullifyExternalObjects();
+ }
+
+ private void configureFeedLogManager() throws AsterixException {
+ this.isFeed = ExternalDataUtils.isFeed(configuration);
+ if (isFeed) {
+ feedLogFileSplits = FeedUtils.splitsForAdapter(ExternalDataUtils.getDataverse(configuration),
+ ExternalDataUtils.getFeedName(configuration), dataSourceFactory.getPartitionConstraint());
+ }
+ }
+
+ private void nullifyExternalObjects() {
+ if (ExternalDataUtils.isExternal(configuration.get(ExternalDataConstants.KEY_READER))) {
+ dataSourceFactory = null;
+ }
+ if (ExternalDataUtils.isExternal(configuration.get(ExternalDataConstants.KEY_PARSER))) {
+ dataParserFactory = null;
+ }
+ }
+
- private void prepare() throws AsterixException {
- if (dataSourceFactory.isIndexible() && (files != null)) {
- ((IIndexibleExternalDataSource) dataSourceFactory).setSnapshot(files, indexingOp);
- }
- dataSourceFactory.configure(configuration);
- dataParserFactory.setRecordType(recordType);
- dataParserFactory.setMetaType(metaType);
- dataParserFactory.configure(configuration);
- }
-
+ @Override
+ public ARecordType getAdapterOutputType() {
+ return recordType;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/AsterixInputStream.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/AsterixInputStream.java
index 83d7a3a,0000000..a4c2fae
mode 100644,000000..100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/AsterixInputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/AsterixInputStream.java
@@@ -1,44 -1,0 +1,50 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.api;
+
+import java.io.InputStream;
+
+import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
+import org.apache.asterix.external.util.FeedLogManager;
++import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public abstract class AsterixInputStream extends InputStream {
+
+ protected AbstractFeedDataFlowController controller;
+ protected FeedLogManager logManager;
++ protected IStreamNotificationHandler notificationHandler;
+
+ public abstract boolean stop() throws Exception;
+
+ public abstract boolean handleException(Throwable th);
+
+ // TODO: Find a better way to send notifications
+ public void setController(AbstractFeedDataFlowController controller) {
+ this.controller = controller;
+ }
+
+ // TODO: Find a better way to send notifications
- public void setFeedLogManager(FeedLogManager logManager) {
++ public void setFeedLogManager(FeedLogManager logManager) throws HyracksDataException {
+ this.logManager = logManager;
+ }
++
++ public void setNotificationHandler(IStreamNotificationHandler notificationHandler) {
++ this.notificationHandler = notificationHandler;
++ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
index 11e2472,0000000..9cce1c9
mode 100644,000000..100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
@@@ -1,71 -1,0 +1,76 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.api;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
+import org.apache.asterix.external.util.FeedLogManager;
++import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * This interface represents a record reader that reads data from external source as a set of records
++ *
+ * @param <T>
+ */
+public interface IRecordReader<T> extends Closeable {
+
+ /**
+ * @return true if the reader has more records remaining, false, otherwise.
+ * @throws Exception
- * if an error takes place
++ * if an error takes place
+ */
+ public boolean hasNext() throws Exception;
+
+ /**
+ * @return the object representing the next record.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public IRawRecord<T> next() throws IOException, InterruptedException;
+
+ /**
+ * used to stop reader from producing more records.
++ *
+ * @return true if the connection to the external source has been suspended, false otherwise.
+ */
+ public boolean stop();
+
+ // TODO: Find a better way to do flushes, this doesn't fit here
+ /**
+ * set a pointer to the controller of the feed. the controller can be used to flush()
+ * parsed records when waiting for more records to be pushed
+ */
+ public void setController(AbstractFeedDataFlowController controller);
+
+ // TODO: Find a better way to perform logging. this doesn't fit here
+ /**
+ * set a pointer to the log manager of the feed. the log manager can be used to log
+ * progress and errors
++ *
++ * @throws HyracksDataException
+ */
- public void setFeedLogManager(FeedLogManager feedLogManager);
++ public void setFeedLogManager(FeedLogManager feedLogManager) throws HyracksDataException;
+
+ /**
+ * gives the record reader a chance to recover from IO errors during feed intake
+ */
+ public boolean handleException(Throwable th);
+}