You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by im...@apache.org on 2016/04/07 16:59:52 UTC

[17/50] [abbrv] incubator-asterixdb git commit: Merge branch 'master' into hyracks-merge2

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
index b102e0b,0000000..e23837b
mode 100644,000000..100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
@@@ -1,798 -1,0 +1,802 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.asterix.test.aql;
 +
 +import java.io.BufferedInputStream;
 +import java.io.BufferedReader;
 +import java.io.File;
 +import java.io.FileInputStream;
 +import java.io.FileOutputStream;
 +import java.io.FileReader;
 +import java.io.InputStream;
 +import java.io.InputStreamReader;
 +import java.io.PrintWriter;
 +import java.io.StringWriter;
 +import java.lang.reflect.InvocationTargetException;
 +import java.lang.reflect.Method;
 +import java.nio.charset.StandardCharsets;
 +import java.util.Arrays;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Set;
 +import java.util.logging.Level;
 +import java.util.logging.Logger;
 +
 +import org.apache.asterix.common.config.GlobalConfig;
 +import org.apache.asterix.common.utils.ServletUtil.Servlets;
 +import org.apache.asterix.test.server.ITestServer;
 +import org.apache.asterix.test.server.TestServerProvider;
 +import org.apache.asterix.testframework.context.TestCaseContext;
 +import org.apache.asterix.testframework.context.TestCaseContext.OutputFormat;
 +import org.apache.asterix.testframework.context.TestFileContext;
 +import org.apache.asterix.testframework.xml.TestCase.CompilationUnit;
 +import org.apache.asterix.testframework.xml.TestGroup;
 +import org.apache.commons.httpclient.DefaultHttpMethodRetryHandler;
 +import org.apache.commons.httpclient.HttpClient;
 +import org.apache.commons.httpclient.HttpMethod;
 +import org.apache.commons.httpclient.HttpMethodBase;
 +import org.apache.commons.httpclient.HttpStatus;
 +import org.apache.commons.httpclient.NameValuePair;
 +import org.apache.commons.httpclient.methods.GetMethod;
 +import org.apache.commons.httpclient.methods.PostMethod;
 +import org.apache.commons.httpclient.methods.StringRequestEntity;
 +import org.apache.commons.httpclient.params.HttpMethodParams;
 +import org.apache.commons.io.IOUtils;
++import org.apache.commons.lang3.mutable.MutableInt;
 +import org.json.JSONObject;
 +
 +public class TestExecutor {
 +
 +    /*
 +     * Static variables
 +     */
 +    protected static final Logger LOGGER = Logger.getLogger(TestExecutor.class.getName());
 +    // see
 +    // https://stackoverflow.com/questions/417142/what-is-the-maximum-length-of-a-url-in-different-browsers/417184
 +    private static final long MAX_URL_LENGTH = 2000l;
 +    private static Method managixExecuteMethod = null;
 +    private static final HashMap<Integer, ITestServer> runningTestServers = new HashMap<>();
 +
 +    /*
 +     * Instance members
 +     */
 +    private String host;
 +    private int port;
 +    private ITestLibrarian librarian;
 +
 +    public TestExecutor() {
 +        host = "127.0.0.1";
 +        port = 19002;
 +    }
 +
 +    public TestExecutor(String host, int port) {
 +        this.host = host;
 +        this.port = port;
 +    }
 +
 +    public void setLibrarian(ITestLibrarian librarian) {
 +        this.librarian = librarian;
 +    }
 +
 +    /**
 +     * Probably does not work well with symlinks.
 +     */
 +    public boolean deleteRec(File path) {
 +        if (path.isDirectory()) {
 +            for (File f : path.listFiles()) {
 +                if (!deleteRec(f)) {
 +                    return false;
 +                }
 +            }
 +        }
 +        return path.delete();
 +    }
 +
 +    public void runScriptAndCompareWithResult(File scriptFile, PrintWriter print, File expectedFile, File actualFile)
 +            throws Exception {
 +        System.err.println("Expected results file: " + expectedFile.toString());
 +        BufferedReader readerExpected = new BufferedReader(
 +                new InputStreamReader(new FileInputStream(expectedFile), "UTF-8"));
 +        BufferedReader readerActual = new BufferedReader(
 +                new InputStreamReader(new FileInputStream(actualFile), "UTF-8"));
 +        String lineExpected, lineActual;
 +        int num = 1;
 +        try {
 +            while ((lineExpected = readerExpected.readLine()) != null) {
 +                lineActual = readerActual.readLine();
 +                // Assert.assertEquals(lineExpected, lineActual);
 +                if (lineActual == null) {
 +                    if (lineExpected.isEmpty()) {
 +                        continue;
 +                    }
 +                    throw new Exception(
 +                            "Result for " + scriptFile + " changed at line " + num + ":\n< " + lineExpected + "\n> ");
 +                }
 +
 +                // Comparing result equality but ignore "Time"-prefixed fields. (for metadata tests.)
 +                String[] lineSplitsExpected = lineExpected.split("Time");
 +                String[] lineSplitsActual = lineActual.split("Time");
 +                if (lineSplitsExpected.length != lineSplitsActual.length) {
 +                    throw new Exception("Result for " + scriptFile + " changed at line " + num + ":\n< " + lineExpected
 +                            + "\n> " + lineActual);
 +                }
 +                if (!equalStrings(lineSplitsExpected[0], lineSplitsActual[0])) {
 +                    throw new Exception("Result for " + scriptFile + " changed at line " + num + ":\n< " + lineExpected
 +                            + "\n> " + lineActual);
 +                }
 +
 +                for (int i = 1; i < lineSplitsExpected.length; i++) {
 +                    String[] splitsByCommaExpected = lineSplitsExpected[i].split(",");
 +                    String[] splitsByCommaActual = lineSplitsActual[i].split(",");
 +                    if (splitsByCommaExpected.length != splitsByCommaActual.length) {
 +                        throw new Exception("Result for " + scriptFile + " changed at line " + num + ":\n< "
 +                                + lineExpected + "\n> " + lineActual);
 +                    }
 +                    for (int j = 1; j < splitsByCommaExpected.length; j++) {
 +                        if (splitsByCommaExpected[j].indexOf("DatasetId") >= 0) {
 +                            // Ignore the field "DatasetId", which is different for different runs.
 +                            // (for metadata tests)
 +                            continue;
 +                        }
 +                        if (!equalStrings(splitsByCommaExpected[j], splitsByCommaActual[j])) {
 +                            throw new Exception("Result for " + scriptFile + " changed at line " + num + ":\n< "
 +                                    + lineExpected + "\n> " + lineActual);
 +                        }
 +                    }
 +                }
 +
 +                ++num;
 +            }
 +            lineActual = readerActual.readLine();
 +            if (lineActual != null) {
 +                throw new Exception("Result for " + scriptFile + " changed at line " + num + ":\n< \n> " + lineActual);
 +            }
 +        } finally {
 +            readerExpected.close();
 +            readerActual.close();
 +        }
 +
 +    }
 +
 +    private boolean equalStrings(String s1, String s2) {
 +        String[] rowsOne = s1.split("\n");
 +        String[] rowsTwo = s2.split("\n");
 +
 +        for (int i = 0; i < rowsOne.length; i++) {
 +            String row1 = rowsOne[i];
 +            String row2 = rowsTwo[i];
 +
 +            if (row1.equals(row2)) {
 +                continue;
 +            }
 +
 +            String[] fields1 = row1.split(" ");
 +            String[] fields2 = row2.split(" ");
 +
 +            boolean bagEncountered = false;
 +            Set<String> bagElements1 = new HashSet<String>();
 +            Set<String> bagElements2 = new HashSet<String>();
 +
 +            for (int j = 0; j < fields1.length; j++) {
 +                if (j >= fields2.length) {
 +                    return false;
 +                } else if (fields1[j].equals(fields2[j])) {
 +                    bagEncountered = fields1[j].equals("{{");
 +                    if (fields1[j].startsWith("}}")) {
 +                        if (!bagElements1.equals(bagElements2)) {
 +                            return false;
 +                        }
 +                        bagEncountered = false;
 +                        bagElements1.clear();
 +                        bagElements2.clear();
 +                    }
 +                    continue;
 +                } else if (fields1[j].indexOf('.') < 0) {
 +                    if (bagEncountered) {
 +                        bagElements1.add(fields1[j].replaceAll(",$", ""));
 +                        bagElements2.add(fields2[j].replaceAll(",$", ""));
 +                        continue;
 +                    }
 +                    return false;
 +                } else {
 +                    // If the fields are floating-point numbers, test them
 +                    // for equality safely
 +                    fields1[j] = fields1[j].split(",")[0];
 +                    fields2[j] = fields2[j].split(",")[0];
 +                    try {
 +                        Double double1 = Double.parseDouble(fields1[j]);
 +                        Double double2 = Double.parseDouble(fields2[j]);
 +                        float float1 = (float) double1.doubleValue();
 +                        float float2 = (float) double2.doubleValue();
 +
 +                        if (Math.abs(float1 - float2) == 0) {
 +                            continue;
 +                        } else {
 +                            return false;
 +                        }
 +                    } catch (NumberFormatException ignored) {
 +                        // Guess they weren't numbers - must simply not be equal
 +                        return false;
 +                    }
 +                }
 +            }
 +        }
 +        return true;
 +    }
 +
 +    // For tests where you simply want the byte-for-byte output.
 +    private static void writeOutputToFile(File actualFile, InputStream resultStream) throws Exception {
 +        try (FileOutputStream out = new FileOutputStream(actualFile)) {
 +            IOUtils.copy(resultStream, out);
 +        }
 +    }
 +
 +    private int executeHttpMethod(HttpMethod method) throws Exception {
 +        HttpClient client = new HttpClient();
 +        int statusCode;
 +        try {
 +            statusCode = client.executeMethod(method);
 +        } catch (Exception e) {
 +            GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, e.getMessage(), e);
 +            e.printStackTrace();
 +            throw e;
 +        }
 +        if (statusCode != HttpStatus.SC_OK) {
 +            // QQQ For now, we are indeed assuming we get back JSON errors.
 +            // In future this may be changed depending on the requested
 +            // output format sent to the servlet.
 +            String errorBody = method.getResponseBodyAsString();
 +            JSONObject result = new JSONObject(errorBody);
 +            String[] errors = { result.getJSONArray("error-code").getString(0), result.getString("summary"),
 +                    result.getString("stacktrace") };
 +            GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, errors[2]);
 +            String exceptionMsg = "HTTP operation failed: " + errors[0] + "\nSTATUS LINE: " + method.getStatusLine()
 +                    + "\nSUMMARY: " + errors[1] + "\nSTACKTRACE: " + errors[2];
 +            throw new Exception(exceptionMsg);
 +        }
 +        return statusCode;
 +    }
 +
 +    // Executes Query and returns results as JSONArray
 +    public InputStream executeQuery(String str, OutputFormat fmt, String url, List<CompilationUnit.Parameter> params)
 +            throws Exception {
 +        HttpMethodBase method = null;
 +        if (str.length() + url.length() < MAX_URL_LENGTH) {
 +            // Use GET for small-ish queries
 +            method = new GetMethod(url);
 +            NameValuePair[] parameters = new NameValuePair[params.size() + 1];
 +            parameters[0] = new NameValuePair("query", str);
 +            int i = 1;
 +            for (CompilationUnit.Parameter param : params) {
 +                parameters[i++] = new NameValuePair(param.getName(), param.getValue());
 +            }
 +            method.setQueryString(parameters);
 +        } else {
 +            // Use POST for bigger ones to avoid 413 FULL_HEAD
 +            // QQQ POST API doesn't allow encoding additional parameters
 +            method = new PostMethod(url);
 +            ((PostMethod) method).setRequestEntity(new StringRequestEntity(str));
 +        }
 +
 +        // Set accepted output response type
 +        method.setRequestHeader("Accept", fmt.mimeType());
 +        // Provide custom retry handler is necessary
 +        method.getParams().setParameter(HttpMethodParams.RETRY_HANDLER, new DefaultHttpMethodRetryHandler(3, false));
 +        executeHttpMethod(method);
 +        return method.getResponseBodyAsStream();
 +    }
 +
 +    public InputStream executeClusterStateQuery(OutputFormat fmt, String url) throws Exception {
 +        HttpMethodBase method = new GetMethod(url);
 +
 +        // Set accepted output response type
 +        method.setRequestHeader("Accept", fmt.mimeType());
 +        // Provide custom retry handler is necessary
 +        method.getParams().setParameter(HttpMethodParams.RETRY_HANDLER, new DefaultHttpMethodRetryHandler(3, false));
 +        executeHttpMethod(method);
 +        return method.getResponseBodyAsStream();
 +    }
 +
 +    // To execute Update statements
 +    // Insert and Delete statements are executed here
 +    public void executeUpdate(String str, String url) throws Exception {
 +        // Create a method instance.
 +        PostMethod method = new PostMethod(url);
 +        method.setRequestEntity(new StringRequestEntity(str));
 +
 +        // Provide custom retry handler is necessary
 +        method.getParams().setParameter(HttpMethodParams.RETRY_HANDLER, new DefaultHttpMethodRetryHandler(3, false));
 +
 +        // Execute the method.
 +        executeHttpMethod(method);
 +    }
 +
 +    // Executes AQL in either async or async-defer mode.
 +    public InputStream executeAnyAQLAsync(String str, boolean defer, OutputFormat fmt, String url) throws Exception {
 +        // Create a method instance.
 +        PostMethod method = new PostMethod(url);
 +        if (defer) {
 +            method.setQueryString(new NameValuePair[] { new NameValuePair("mode", "asynchronous-deferred") });
 +        } else {
 +            method.setQueryString(new NameValuePair[] { new NameValuePair("mode", "asynchronous") });
 +        }
 +        method.setRequestEntity(new StringRequestEntity(str));
 +        method.setRequestHeader("Accept", fmt.mimeType());
 +
 +        // Provide custom retry handler is necessary
 +        method.getParams().setParameter(HttpMethodParams.RETRY_HANDLER, new DefaultHttpMethodRetryHandler(3, false));
 +        executeHttpMethod(method);
 +        InputStream resultStream = method.getResponseBodyAsStream();
 +
 +        String theHandle = IOUtils.toString(resultStream, "UTF-8");
 +
 +        // take the handle and parse it so results can be retrieved
 +        InputStream handleResult = getHandleResult(theHandle, fmt);
 +        return handleResult;
 +    }
 +
 +    private InputStream getHandleResult(String handle, OutputFormat fmt) throws Exception {
 +        final String url = "http://" + host + ":" + port + Servlets.QUERY_RESULT.getPath();
 +
 +        // Create a method instance.
 +        GetMethod method = new GetMethod(url);
 +        method.setQueryString(new NameValuePair[] { new NameValuePair("handle", handle) });
 +        method.setRequestHeader("Accept", fmt.mimeType());
 +
 +        // Provide custom retry handler is necessary
 +        method.getParams().setParameter(HttpMethodParams.RETRY_HANDLER, new DefaultHttpMethodRetryHandler(3, false));
 +
 +        executeHttpMethod(method);
 +        return method.getResponseBodyAsStream();
 +    }
 +
 +    // To execute DDL and Update statements
 +    // create type statement
 +    // create dataset statement
 +    // create index statement
 +    // create dataverse statement
 +    // create function statement
 +    public void executeDDL(String str, String url) throws Exception {
 +        // Create a method instance.
 +        PostMethod method = new PostMethod(url);
 +        method.setRequestEntity(new StringRequestEntity(str));
 +        // Provide custom retry handler is necessary
 +        method.getParams().setParameter(HttpMethodParams.RETRY_HANDLER, new DefaultHttpMethodRetryHandler(3, false));
 +
 +        // Execute the method.
 +        executeHttpMethod(method);
 +    }
 +
 +    // Method that reads a DDL/Update/Query File
 +    // and returns the contents as a string
 +    // This string is later passed to REST API for execution.
 +    public String readTestFile(File testFile) throws Exception {
 +        BufferedReader reader = new BufferedReader(new FileReader(testFile));
 +        String line = null;
 +        StringBuilder stringBuilder = new StringBuilder();
 +        String ls = System.getProperty("line.separator");
 +        while ((line = reader.readLine()) != null) {
 +            stringBuilder.append(line);
 +            stringBuilder.append(ls);
 +        }
 +        reader.close();
 +        return stringBuilder.toString();
 +    }
 +
 +    public static void executeManagixCommand(String command) throws ClassNotFoundException, NoSuchMethodException,
 +            SecurityException, IllegalAccessException, IllegalArgumentException, InvocationTargetException {
 +        if (managixExecuteMethod == null) {
 +            Class<?> clazz = Class.forName("org.apache.asterix.installer.test.AsterixInstallerIntegrationUtil");
 +            managixExecuteMethod = clazz.getMethod("executeCommand", String.class);
 +        }
 +        managixExecuteMethod.invoke(null, command);
 +    }
 +
 +    public static String executeScript(ProcessBuilder pb, String scriptPath) throws Exception {
 +        pb.command(scriptPath);
 +        Process p = pb.start();
 +        p.waitFor();
 +        return getProcessOutput(p);
 +    }
 +
 +    private static String executeVagrantScript(ProcessBuilder pb, String node, String scriptName) throws Exception {
 +        pb.command("vagrant", "ssh", node, "--", pb.environment().get("SCRIPT_HOME") + scriptName);
 +        Process p = pb.start();
 +        p.waitFor();
 +        InputStream input = p.getInputStream();
 +        return IOUtils.toString(input, StandardCharsets.UTF_8.name());
 +    }
 +
 +    private static String executeVagrantManagix(ProcessBuilder pb, String command) throws Exception {
 +        pb.command("vagrant", "ssh", "cc", "--", pb.environment().get("MANAGIX_HOME") + command);
 +        Process p = pb.start();
 +        p.waitFor();
 +        InputStream input = p.getInputStream();
 +        return IOUtils.toString(input, StandardCharsets.UTF_8.name());
 +    }
 +
 +    private static String getScriptPath(String queryPath, String scriptBasePath, String scriptFileName) {
 +        String targetWord = "queries" + File.separator;
 +        int targetWordSize = targetWord.lastIndexOf(File.separator);
 +        int beginIndex = queryPath.lastIndexOf(targetWord) + targetWordSize;
 +        int endIndex = queryPath.lastIndexOf(File.separator);
 +        String prefix = queryPath.substring(beginIndex, endIndex);
 +        String scriptPath = scriptBasePath + prefix + File.separator + scriptFileName;
 +        return scriptPath;
 +    }
 +
 +    private static String getProcessOutput(Process p) throws Exception {
 +        StringBuilder s = new StringBuilder();
 +        BufferedInputStream bisIn = new BufferedInputStream(p.getInputStream());
 +        StringWriter writerIn = new StringWriter();
 +        IOUtils.copy(bisIn, writerIn, "UTF-8");
 +        s.append(writerIn.toString());
 +
 +        BufferedInputStream bisErr = new BufferedInputStream(p.getErrorStream());
 +        StringWriter writerErr = new StringWriter();
 +        IOUtils.copy(bisErr, writerErr, "UTF-8");
 +        s.append(writerErr.toString());
 +        if (writerErr.toString().length() > 0) {
 +            StringBuilder sbErr = new StringBuilder();
 +            sbErr.append("script execution failed - error message:\n");
 +            sbErr.append("-------------------------------------------\n");
 +            sbErr.append(s.toString());
 +            sbErr.append("-------------------------------------------\n");
 +            LOGGER.info(sbErr.toString().trim());
 +            throw new Exception(s.toString().trim());
 +        }
 +        return s.toString();
 +    }
 +
 +    public void executeTest(String actualPath, TestCaseContext testCaseCtx, ProcessBuilder pb,
 +            boolean isDmlRecoveryTest) throws Exception {
 +        executeTest(actualPath, testCaseCtx, pb, isDmlRecoveryTest, null);
 +    }
 +
++    public void executeTest(TestCaseContext testCaseCtx, TestFileContext ctx, String statement,
++            boolean isDmlRecoveryTest, ProcessBuilder pb, CompilationUnit cUnit, MutableInt queryCount,
++            List<TestFileContext> expectedResultFileCtxs, File testFile, String actualPath) throws Exception {
++        File qbcFile;
++        boolean failed = false;
++        File expectedResultFile;
++        switch (ctx.getType()) {
++            case "ddl":
++                if (ctx.getFile().getName().endsWith("aql")) {
++                    executeDDL(statement, "http://" + host + ":" + port + Servlets.AQL_DDL.getPath());
++                } else {
++                    executeDDL(statement, "http://" + host + ":" + port + Servlets.SQLPP_DDL.getPath());
++                }
++                break;
++            case "update":
++                // isDmlRecoveryTest: set IP address
++                if (isDmlRecoveryTest && statement.contains("nc1://")) {
++                    statement = statement.replaceAll("nc1://", "127.0.0.1://../../../../../../asterix-app/");
++                }
++                if (ctx.getFile().getName().endsWith("aql")) {
++                    executeUpdate(statement, "http://" + host + ":" + port + Servlets.AQL_UPDATE.getPath());
++                } else {
++                    executeUpdate(statement, "http://" + host + ":" + port + Servlets.SQLPP_UPDATE.getPath());
++                }
++                break;
++            case "query":
++            case "async":
++            case "asyncdefer":
++                // isDmlRecoveryTest: insert Crash and Recovery
++                if (isDmlRecoveryTest) {
++                    executeScript(pb, pb.environment().get("SCRIPT_HOME") + File.separator + "dml_recovery"
++                            + File.separator + "kill_cc_and_nc.sh");
++                    executeScript(pb, pb.environment().get("SCRIPT_HOME") + File.separator + "dml_recovery"
++                            + File.separator + "stop_and_start.sh");
++                }
++                InputStream resultStream = null;
++                OutputFormat fmt = OutputFormat.forCompilationUnit(cUnit);
++                if (ctx.getFile().getName().endsWith("aql")) {
++                    if (ctx.getType().equalsIgnoreCase("query")) {
++                        resultStream = executeQuery(statement, fmt,
++                                "http://" + host + ":" + port + Servlets.AQL_QUERY.getPath(), cUnit.getParameter());
++                    } else if (ctx.getType().equalsIgnoreCase("async")) {
++                        resultStream = executeAnyAQLAsync(statement, false, fmt,
++                                "http://" + host + ":" + port + Servlets.AQL.getPath());
++                    } else if (ctx.getType().equalsIgnoreCase("asyncdefer")) {
++                        resultStream = executeAnyAQLAsync(statement, true, fmt,
++                                "http://" + host + ":" + port + Servlets.AQL.getPath());
++                    }
++                } else {
++                    if (ctx.getType().equalsIgnoreCase("query")) {
++                        resultStream = executeQuery(statement, fmt,
++                                "http://" + host + ":" + port + Servlets.SQLPP_QUERY.getPath(), cUnit.getParameter());
++                    } else if (ctx.getType().equalsIgnoreCase("async")) {
++                        resultStream = executeAnyAQLAsync(statement, false, fmt,
++                                "http://" + host + ":" + port + Servlets.SQLPP.getPath());
++                    } else if (ctx.getType().equalsIgnoreCase("asyncdefer")) {
++                        resultStream = executeAnyAQLAsync(statement, true, fmt,
++                                "http://" + host + ":" + port + Servlets.SQLPP.getPath());
++                    }
++                }
++
++                if (queryCount.intValue() >= expectedResultFileCtxs.size()) {
++                    throw new IllegalStateException("no result file for " + testFile.toString() + "; queryCount: "
++                            + queryCount + ", filectxs.size: " + expectedResultFileCtxs.size());
++                }
++                expectedResultFile = expectedResultFileCtxs.get(queryCount.intValue()).getFile();
++
++                File actualResultFile = testCaseCtx.getActualResultFile(cUnit, new File(actualPath));
++                actualResultFile.getParentFile().mkdirs();
++                writeOutputToFile(actualResultFile, resultStream);
++
++                runScriptAndCompareWithResult(testFile, new PrintWriter(System.err), expectedResultFile,
++                        actualResultFile);
++                queryCount.increment();
++                break;
++            case "mgx":
++                executeManagixCommand(statement);
++                break;
++            case "txnqbc": // qbc represents query before crash
++                resultStream = executeQuery(statement, OutputFormat.forCompilationUnit(cUnit),
++                        "http://" + host + ":" + port + Servlets.AQL_QUERY.getPath(), cUnit.getParameter());
++                qbcFile = getTestCaseQueryBeforeCrashFile(actualPath, testCaseCtx, cUnit);
++                qbcFile.getParentFile().mkdirs();
++                writeOutputToFile(qbcFile, resultStream);
++                break;
++            case "txnqar": // qar represents query after recovery
++                resultStream = executeQuery(statement, OutputFormat.forCompilationUnit(cUnit),
++                        "http://" + host + ":" + port + Servlets.AQL_QUERY.getPath(), cUnit.getParameter());
++                File qarFile = new File(actualPath + File.separator
++                        + testCaseCtx.getTestCase().getFilePath().replace(File.separator, "_") + "_" + cUnit.getName()
++                        + "_qar.adm");
++                qarFile.getParentFile().mkdirs();
++                writeOutputToFile(qarFile, resultStream);
++                qbcFile = getTestCaseQueryBeforeCrashFile(actualPath, testCaseCtx, cUnit);
++                runScriptAndCompareWithResult(testFile, new PrintWriter(System.err), qbcFile, qarFile);
++                break;
++            case "txneu": // eu represents erroneous update
++                try {
++                    executeUpdate(statement, "http://" + host + ":" + port + Servlets.AQL_UPDATE.getPath());
++                } catch (Exception e) {
++                    // An exception is expected.
++                    failed = true;
++                    e.printStackTrace();
++                }
++                if (!failed) {
++                    throw new Exception("Test \"" + testFile + "\" FAILED!\n  An exception" + "is expected.");
++                }
++                System.err.println("...but that was expected.");
++                break;
++            case "script":
++                try {
++                    String output = executeScript(pb, getScriptPath(testFile.getAbsolutePath(),
++                            pb.environment().get("SCRIPT_HOME"), statement.trim()));
++                    if (output.contains("ERROR")) {
++                        throw new Exception(output);
++                    }
++                } catch (Exception e) {
++                    throw new Exception("Test \"" + testFile + "\" FAILED!\n", e);
++                }
++                break;
++            case "sleep":
++                String[] lines = statement.split("\n");
++                Thread.sleep(Long.parseLong(lines[lines.length - 1].trim()));
++                break;
++            case "errddl": // a ddlquery that expects error
++                try {
++                    executeDDL(statement, "http://" + host + ":" + port + Servlets.AQL_DDL.getPath());
++                } catch (Exception e) {
++                    // expected error happens
++                    failed = true;
++                    e.printStackTrace();
++                }
++                if (!failed) {
++                    throw new Exception("Test \"" + testFile + "\" FAILED!\n  An exception is expected.");
++                }
++                System.err.println("...but that was expected.");
++                break;
++            case "vscript": // a script that will be executed on a vagrant virtual node
++                try {
++                    String[] command = statement.trim().split(" ");
++                    if (command.length != 2) {
++                        throw new Exception("invalid vagrant script format");
++                    }
++                    String nodeId = command[0];
++                    String scriptName = command[1];
++                    String output = executeVagrantScript(pb, nodeId, scriptName);
++                    if (output.contains("ERROR")) {
++                        throw new Exception(output);
++                    }
++                } catch (Exception e) {
++                    throw new Exception("Test \"" + testFile + "\" FAILED!\n", e);
++                }
++                break;
++            case "vmgx": // a managix command that will be executed on vagrant cc node
++                try {
++                    String output = executeVagrantManagix(pb, statement);
++                    if (output.contains("ERROR")) {
++                        throw new Exception(output);
++                    }
++                } catch (Exception e) {
++                    throw new Exception("Test \"" + testFile + "\" FAILED!\n", e);
++                }
++                break;
++            case "cstate": // cluster state query
++                try {
++                    fmt = OutputFormat.forCompilationUnit(cUnit);
++                    resultStream = executeClusterStateQuery(fmt,
++                            "http://" + host + ":" + port + Servlets.CLUSTER_STATE.getPath());
++                    expectedResultFile = expectedResultFileCtxs.get(queryCount.intValue()).getFile();
++                    actualResultFile = testCaseCtx.getActualResultFile(cUnit, new File(actualPath));
++                    actualResultFile.getParentFile().mkdirs();
++                    writeOutputToFile(actualResultFile, resultStream);
++                    runScriptAndCompareWithResult(testFile, new PrintWriter(System.err), expectedResultFile,
++                            actualResultFile);
++                    queryCount.increment();
++                } catch (Exception e) {
++                    throw new Exception("Test \"" + testFile + "\" FAILED!\n", e);
++                }
++                break;
++            case "server": // (start <test server name> <port>
++                           // [<arg1>][<arg2>][<arg3>]...|stop (<port>|all))
++                try {
++                    lines = statement.trim().split("\n");
++                    String[] command = lines[lines.length - 1].trim().split(" ");
++                    if (command.length < 2) {
++                        throw new Exception("invalid server command format. expected format ="
++                                + " (start <test server name> <port> [<arg1>][<arg2>][<arg3>]"
++                                + "...|stop (<port>|all))");
++                    }
++                    String action = command[0];
++                    if (action.equals("start")) {
++                        if (command.length < 3) {
++                            throw new Exception("invalid server start command. expected format ="
++                                    + " (start <test server name> <port> [<arg1>][<arg2>][<arg3>]...");
++                        }
++                        String name = command[1];
++                        Integer port = new Integer(command[2]);
++                        if (runningTestServers.containsKey(port)) {
++                            throw new Exception("server with port " + port + " is already running");
++                        }
++                        ITestServer server = TestServerProvider.createTestServer(name, port);
++                        server.configure(Arrays.copyOfRange(command, 3, command.length));
++                        server.start();
++                        runningTestServers.put(port, server);
++                    } else if (action.equals("stop")) {
++                        String target = command[1];
++                        if (target.equals("all")) {
++                            for (ITestServer server : runningTestServers.values()) {
++                                server.stop();
++                            }
++                            runningTestServers.clear();
++                        } else {
++                            Integer port = new Integer(command[1]);
++                            ITestServer server = runningTestServers.get(port);
++                            if (server == null) {
++                                throw new Exception("no server is listening to port " + port);
++                            }
++                            server.stop();
++                            runningTestServers.remove(port);
++                        }
++                    } else {
++                        throw new Exception("unknown server action");
++                    }
++                } catch (Exception e) {
++                    throw new Exception("Test \"" + testFile + "\" FAILED!\n", e);
++                }
++                break;
++            case "lib": // expected format <dataverse-name> <library-name>
++                        // <library-directory>
++                        // TODO: make this case work well with entity names containing spaces by
++                        // looking for \"
++                lines = statement.split("\n");
++                String lastLine = lines[lines.length - 1];
++                String[] command = lastLine.trim().split(" ");
++                if (command.length < 3) {
++                    throw new Exception("invalid library format");
++                }
++                String dataverse = command[1];
++                String library = command[2];
++                switch (command[0]) {
++                    case "install":
++                        if (command.length != 4) {
++                            throw new Exception("invalid library format");
++                        }
++                        String libPath = command[3];
++                        librarian.install(dataverse, library, libPath);
++                        break;
++                    case "uninstall":
++                        if (command.length != 3) {
++                            throw new Exception("invalid library format");
++                        }
++                        librarian.uninstall(dataverse, library);
++                        break;
++                    default:
++                        throw new Exception("invalid library format");
++                }
++                break;
++            default:
++                throw new IllegalArgumentException("No statements of type " + ctx.getType());
++        }
++    }
++
 +    public void executeTest(String actualPath, TestCaseContext testCaseCtx, ProcessBuilder pb,
 +            boolean isDmlRecoveryTest, TestGroup failedGroup) throws Exception {
- 
 +        File testFile;
-         File expectedResultFile;
 +        String statement;
 +        List<TestFileContext> expectedResultFileCtxs;
 +        List<TestFileContext> testFileCtxs;
-         File qbcFile = null;
-         File qarFile = null;
-         int queryCount = 0;
++        MutableInt queryCount = new MutableInt(0);
 +        int numOfErrors = 0;
 +        int numOfFiles = 0;
- 
 +        List<CompilationUnit> cUnits = testCaseCtx.getTestCase().getCompilationUnit();
 +        for (CompilationUnit cUnit : cUnits) {
 +            LOGGER.info(
 +                    "Starting [TEST]: " + testCaseCtx.getTestCase().getFilePath() + "/" + cUnit.getName() + " ... ");
 +            testFileCtxs = testCaseCtx.getTestFiles(cUnit);
 +            expectedResultFileCtxs = testCaseCtx.getExpectedResultFiles(cUnit);
 +            for (TestFileContext ctx : testFileCtxs) {
 +                numOfFiles++;
 +                testFile = ctx.getFile();
 +                statement = readTestFile(testFile);
-                 boolean failed = false;
 +                try {
-                     switch (ctx.getType()) {
-                         case "ddl":
-                             if (ctx.getFile().getName().endsWith("aql")) {
-                                 executeDDL(statement, "http://" + host + ":" + port + Servlets.AQL_DDL.getPath());
-                             } else {
-                                 executeDDL(statement, "http://" + host + ":" + port + Servlets.SQLPP_DDL.getPath());
-                             }
-                             break;
-                         case "update":
-                             // isDmlRecoveryTest: set IP address
-                             if (isDmlRecoveryTest && statement.contains("nc1://")) {
-                                 statement = statement.replaceAll("nc1://",
-                                         "127.0.0.1://../../../../../../asterix-app/");
-                             }
-                             if (ctx.getFile().getName().endsWith("aql")) {
-                                 executeUpdate(statement, "http://" + host + ":" + port + Servlets.AQL_UPDATE.getPath());
-                             } else {
-                                 executeUpdate(statement,
-                                         "http://" + host + ":" + port + Servlets.SQLPP_UPDATE.getPath());
-                             }
-                             break;
-                         case "query":
-                         case "async":
-                         case "asyncdefer":
-                             // isDmlRecoveryTest: insert Crash and Recovery
-                             if (isDmlRecoveryTest) {
-                                 executeScript(pb, pb.environment().get("SCRIPT_HOME") + File.separator + "dml_recovery"
-                                         + File.separator + "kill_cc_and_nc.sh");
-                                 executeScript(pb, pb.environment().get("SCRIPT_HOME") + File.separator + "dml_recovery"
-                                         + File.separator + "stop_and_start.sh");
-                             }
-                             InputStream resultStream = null;
-                             OutputFormat fmt = OutputFormat.forCompilationUnit(cUnit);
-                             if (ctx.getFile().getName().endsWith("aql")) {
-                                 if (ctx.getType().equalsIgnoreCase("query")) {
-                                     resultStream = executeQuery(statement, fmt,
-                                             "http://" + host + ":" + port + Servlets.AQL_QUERY.getPath(),
-                                             cUnit.getParameter());
-                                 } else if (ctx.getType().equalsIgnoreCase("async")) {
-                                     resultStream = executeAnyAQLAsync(statement, false, fmt,
-                                             "http://" + host + ":" + port + Servlets.AQL.getPath());
-                                 } else if (ctx.getType().equalsIgnoreCase("asyncdefer")) {
-                                     resultStream = executeAnyAQLAsync(statement, true, fmt,
-                                             "http://" + host + ":" + port + Servlets.AQL.getPath());
-                                 }
-                             } else {
-                                 if (ctx.getType().equalsIgnoreCase("query")) {
-                                     resultStream = executeQuery(statement, fmt,
-                                             "http://" + host + ":" + port + Servlets.SQLPP_QUERY.getPath(),
-                                             cUnit.getParameter());
-                                 } else if (ctx.getType().equalsIgnoreCase("async")) {
-                                     resultStream = executeAnyAQLAsync(statement, false, fmt,
-                                             "http://" + host + ":" + port + Servlets.SQLPP.getPath());
-                                 } else if (ctx.getType().equalsIgnoreCase("asyncdefer")) {
-                                     resultStream = executeAnyAQLAsync(statement, true, fmt,
-                                             "http://" + host + ":" + port + Servlets.SQLPP.getPath());
-                                 }
-                             }
- 
-                             if (queryCount >= expectedResultFileCtxs.size()) {
-                                 throw new IllegalStateException(
-                                         "no result file for " + testFile.toString() + "; queryCount: " + queryCount
-                                                 + ", filectxs.size: " + expectedResultFileCtxs.size());
-                             }
-                             expectedResultFile = expectedResultFileCtxs.get(queryCount).getFile();
- 
-                             File actualResultFile = testCaseCtx.getActualResultFile(cUnit, new File(actualPath));
-                             actualResultFile.getParentFile().mkdirs();
-                             writeOutputToFile(actualResultFile, resultStream);
- 
-                             runScriptAndCompareWithResult(testFile, new PrintWriter(System.err), expectedResultFile,
-                                     actualResultFile);
-                             queryCount++;
-                             break;
-                         case "mgx":
-                             executeManagixCommand(statement);
-                             break;
-                         case "txnqbc": // qbc represents query before crash
-                             resultStream = executeQuery(statement, OutputFormat.forCompilationUnit(cUnit),
-                                     "http://" + host + ":" + port + Servlets.AQL_QUERY.getPath(), cUnit.getParameter());
-                             qbcFile = new File(actualPath + File.separator
-                                     + testCaseCtx.getTestCase().getFilePath().replace(File.separator, "_") + "_"
-                                     + cUnit.getName() + "_qbc.adm");
-                             qbcFile.getParentFile().mkdirs();
-                             writeOutputToFile(qbcFile, resultStream);
-                             break;
-                         case "txnqar": // qar represents query after recovery
-                             resultStream = executeQuery(statement, OutputFormat.forCompilationUnit(cUnit),
-                                     "http://" + host + ":" + port + Servlets.AQL_QUERY.getPath(), cUnit.getParameter());
-                             qarFile = new File(actualPath + File.separator
-                                     + testCaseCtx.getTestCase().getFilePath().replace(File.separator, "_") + "_"
-                                     + cUnit.getName() + "_qar.adm");
-                             qarFile.getParentFile().mkdirs();
-                             writeOutputToFile(qarFile, resultStream);
-                             runScriptAndCompareWithResult(testFile, new PrintWriter(System.err), qbcFile, qarFile);
-                             break;
-                         case "txneu": // eu represents erroneous update
-                             try {
-                                 executeUpdate(statement, "http://" + host + ":" + port + Servlets.AQL_UPDATE.getPath());
-                             } catch (Exception e) {
-                                 // An exception is expected.
-                                 failed = true;
-                                 e.printStackTrace();
-                             }
-                             if (!failed) {
-                                 throw new Exception(
-                                         "Test \"" + testFile + "\" FAILED!\n  An exception" + "is expected.");
-                             }
-                             System.err.println("...but that was expected.");
-                             break;
-                         case "script":
-                             try {
-                                 String output = executeScript(pb, getScriptPath(testFile.getAbsolutePath(),
-                                         pb.environment().get("SCRIPT_HOME"), statement.trim()));
-                                 if (output.contains("ERROR")) {
-                                     throw new Exception(output);
-                                 }
-                             } catch (Exception e) {
-                                 throw new Exception("Test \"" + testFile + "\" FAILED!\n", e);
-                             }
-                             break;
-                         case "sleep":
-                             String[] lines = statement.split("\n");
-                             Thread.sleep(Long.parseLong(lines[lines.length - 1].trim()));
-                             break;
-                         case "errddl": // a ddlquery that expects error
-                             try {
-                                 executeDDL(statement, "http://" + host + ":" + port + Servlets.AQL_DDL.getPath());
-                             } catch (Exception e) {
-                                 // expected error happens
-                                 failed = true;
-                                 e.printStackTrace();
-                             }
-                             if (!failed) {
-                                 throw new Exception("Test \"" + testFile + "\" FAILED!\n  An exception is expected.");
-                             }
-                             System.err.println("...but that was expected.");
-                             break;
-                         case "vscript": // a script that will be executed on a vagrant virtual node
-                             try {
-                                 String[] command = statement.trim().split(" ");
-                                 if (command.length != 2) {
-                                     throw new Exception("invalid vagrant script format");
-                                 }
-                                 String nodeId = command[0];
-                                 String scriptName = command[1];
-                                 String output = executeVagrantScript(pb, nodeId, scriptName);
-                                 if (output.contains("ERROR")) {
-                                     throw new Exception(output);
-                                 }
-                             } catch (Exception e) {
-                                 throw new Exception("Test \"" + testFile + "\" FAILED!\n", e);
-                             }
-                             break;
-                         case "vmgx": // a managix command that will be executed on vagrant cc node
-                             try {
-                                 String output = executeVagrantManagix(pb, statement);
-                                 if (output.contains("ERROR")) {
-                                     throw new Exception(output);
-                                 }
-                             } catch (Exception e) {
-                                 throw new Exception("Test \"" + testFile + "\" FAILED!\n", e);
-                             }
-                             break;
-                         case "cstate": // cluster state query
-                             try {
-                                 fmt = OutputFormat.forCompilationUnit(cUnit);
-                                 resultStream = executeClusterStateQuery(fmt,
-                                         "http://" + host + ":" + port + Servlets.CLUSTER_STATE.getPath());
-                                 expectedResultFile = expectedResultFileCtxs.get(queryCount).getFile();
-                                 actualResultFile = testCaseCtx.getActualResultFile(cUnit, new File(actualPath));
-                                 actualResultFile.getParentFile().mkdirs();
-                                 writeOutputToFile(actualResultFile, resultStream);
-                                 runScriptAndCompareWithResult(testFile, new PrintWriter(System.err), expectedResultFile,
-                                         actualResultFile);
-                                 queryCount++;
-                             } catch (Exception e) {
-                                 throw new Exception("Test \"" + testFile + "\" FAILED!\n", e);
-                             }
-                             break;
-                         case "server": // (start <test server name> <port>
-                                        // [<arg1>][<arg2>][<arg3>]...|stop (<port>|all))
-                             try {
-                                 lines = statement.trim().split("\n");
-                                 String[] command = lines[lines.length - 1].trim().split(" ");
-                                 if (command.length < 2) {
-                                     throw new Exception("invalid server command format. expected format ="
-                                             + " (start <test server name> <port> [<arg1>][<arg2>][<arg3>]"
-                                             + "...|stop (<port>|all))");
-                                 }
-                                 String action = command[0];
-                                 if (action.equals("start")) {
-                                     if (command.length < 3) {
-                                         throw new Exception("invalid server start command. expected format ="
-                                                 + " (start <test server name> <port> [<arg1>][<arg2>][<arg3>]...");
-                                     }
-                                     String name = command[1];
-                                     Integer port = new Integer(command[2]);
-                                     if (runningTestServers.containsKey(port)) {
-                                         throw new Exception("server with port " + port + " is already running");
-                                     }
-                                     ITestServer server = TestServerProvider.createTestServer(name, port);
-                                     server.configure(Arrays.copyOfRange(command, 3, command.length));
-                                     server.start();
-                                     runningTestServers.put(port, server);
-                                 } else if (action.equals("stop")) {
-                                     String target = command[1];
-                                     if (target.equals("all")) {
-                                         for (ITestServer server : runningTestServers.values()) {
-                                             server.stop();
-                                         }
-                                         runningTestServers.clear();
-                                     } else {
-                                         Integer port = new Integer(command[1]);
-                                         ITestServer server = runningTestServers.get(port);
-                                         if (server == null) {
-                                             throw new Exception("no server is listening to port " + port);
-                                         }
-                                         server.stop();
-                                         runningTestServers.remove(port);
-                                     }
-                                 } else {
-                                     throw new Exception("unknown server action");
-                                 }
-                             } catch (Exception e) {
-                                 throw new Exception("Test \"" + testFile + "\" FAILED!\n", e);
-                             }
-                             break;
-                         case "lib": // expected format <dataverse-name> <library-name>
-                                     // <library-directory>
-                                     // TODO: make this case work well with entity names containing spaces by
-                                     // looking for \"
-                             lines = statement.split("\n");
-                             String lastLine = lines[lines.length - 1];
-                             String[] command = lastLine.trim().split(" ");
-                             if (command.length < 3) {
-                                 throw new Exception("invalid library format");
-                             }
-                             String dataverse = command[1];
-                             String library = command[2];
-                             switch (command[0]) {
-                                 case "install":
-                                     if (command.length != 4) {
-                                         throw new Exception("invalid library format");
-                                     }
-                                     String libPath = command[3];
-                                     librarian.install(dataverse, library, libPath);
-                                     break;
-                                 case "uninstall":
-                                     if (command.length != 3) {
-                                         throw new Exception("invalid library format");
-                                     }
-                                     librarian.uninstall(dataverse, library);
-                                     break;
-                                 default:
-                                     throw new Exception("invalid library format");
-                             }
-                             break;
-                         default:
-                             throw new IllegalArgumentException("No statements of type " + ctx.getType());
-                     }
- 
++                    executeTest(testCaseCtx, ctx, statement, isDmlRecoveryTest, pb, cUnit, queryCount,
++                            expectedResultFileCtxs, testFile, actualPath);
 +                } catch (Exception e) {
 +                    System.err.println("testFile " + testFile.toString() + " raised an exception:");
 +                    boolean unExpectedFailure = false;
 +                    numOfErrors++;
 +                    if (cUnit.getExpectedError().size() < numOfErrors) {
 +                        unExpectedFailure = true;
 +                    } else {
 +                        // Get the expected exception
 +                        String expectedError = cUnit.getExpectedError().get(numOfErrors - 1);
 +                        if (e.toString().contains(expectedError)) {
 +                            System.err.println("...but that was expected.");
 +                        } else {
 +                            unExpectedFailure = true;
 +                        }
 +                    }
 +                    if (unExpectedFailure) {
 +                        e.printStackTrace();
 +                        System.err.println("...Unexpected!");
 +                        if (failedGroup != null) {
 +                            failedGroup.getTestCase().add(testCaseCtx.getTestCase());
 +                        }
 +                        throw new Exception("Test \"" + testFile + "\" FAILED!", e);
 +                    }
 +                } finally {
 +                    if (numOfFiles == testFileCtxs.size() && numOfErrors < cUnit.getExpectedError().size()) {
 +                        System.err.println("...Unexpected!");
 +                        Exception e = new Exception(
 +                                "Test \"" + cUnit.getName() + "\" FAILED!\nExpected error was not thrown...");
 +                        e.printStackTrace();
 +                        throw e;
 +                    } else if (numOfFiles == testFileCtxs.size()) {
 +                        LOGGER.info("[TEST]: " + testCaseCtx.getTestCase().getFilePath() + "/" + cUnit.getName()
 +                                + " PASSED ");
 +                    }
 +                }
 +            }
 +        }
 +    }
++
++    private static File getTestCaseQueryBeforeCrashFile(String actualPath, TestCaseContext testCaseCtx,
++            CompilationUnit cUnit) {
++        return new File(
++                actualPath + File.separator + testCaseCtx.getTestCase().getFilePath().replace(File.separator, "_") + "_"
++                        + cUnit.getName() + "_qbc.adm");
++    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-external-data/pom.xml
----------------------------------------------------------------------
diff --cc asterixdb/asterix-external-data/pom.xml
index 9a8540f,0000000..8c59cc4
mode 100644,000000..100644
--- a/asterixdb/asterix-external-data/pom.xml
+++ b/asterixdb/asterix-external-data/pom.xml
@@@ -1,298 -1,0 +1,298 @@@
 +<!-- 
 + ! Licensed to the Apache Software Foundation (ASF) under one
 + ! or more contributor license agreements.  See the NOTICE file
 + ! distributed with this work for additional information
 + ! regarding copyright ownership.  The ASF licenses this file
 + ! to you under the Apache License, Version 2.0 (the
 + ! "License"); you may not use this file except in compliance
 + ! with the License.  You may obtain a copy of the License at
 + !
 + !   http://www.apache.org/licenses/LICENSE-2.0
 + !
 + ! Unless required by applicable law or agreed to in writing,
 + ! software distributed under the License is distributed on an
 + ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + ! KIND, either express or implied.  See the License for the
 + ! specific language governing permissions and limitations
 + ! under the License.
 + !-->
 +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 +    <modelVersion>4.0.0</modelVersion>
 +    <parent>
 +        <artifactId>apache-asterixdb</artifactId>
 +        <groupId>org.apache.asterix</groupId>
 +        <version>0.8.9-SNAPSHOT</version>
 +    </parent>
 +    <licenses>
 +        <license>
 +            <name>Apache License, Version 2.0</name>
 +            <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
 +            <distribution>repo</distribution>
 +            <comments>A business-friendly OSS license</comments>
 +        </license>
 +    </licenses>
 +    <artifactId>asterix-external-data</artifactId>
 +    <properties>
 +        <appendedResourcesDirectory>${basedir}/../src/main/appended-resources</appendedResourcesDirectory>
 +    </properties>
 +    <build>
 +        <plugins>
 +            <plugin>
 +                <groupId>org.apache.asterix</groupId>
 +                <artifactId>lexer-generator-maven-plugin</artifactId>
 +                <version>0.8.9-SNAPSHOT</version>
 +                <configuration>
 +                    <grammarFile>src/main/resources/adm.grammar</grammarFile>
 +                    <outputDir>${project.build.directory}/generated-sources/org/apache/asterix/runtime/operators/file/adm</outputDir>
 +                </configuration>
 +                <executions>
 +                    <execution>
 +                        <id>generate-lexer</id>
 +                        <phase>generate-sources</phase>
 +                        <goals>
 +                            <goal>generate-lexer</goal>
 +                        </goals>
 +                    </execution>
 +                </executions>
 +            </plugin>
 +            <plugin>
 +                <groupId>org.codehaus.mojo</groupId>
 +                <artifactId>build-helper-maven-plugin</artifactId>
 +                <version>1.9</version>
 +                <executions>
 +                    <execution>
 +                        <id>add-source</id>
 +                        <phase>generate-sources</phase>
 +                        <goals>
 +                            <goal>add-source</goal>
 +                        </goals>
 +                        <configuration>
 +                            <sources>
 +                                <source>${project.build.directory}/generated-sources/</source>
 +                            </sources>
 +                        </configuration>
 +                    </execution>
 +                </executions>
 +            </plugin>
 +            <plugin>
 +                <groupId>org.jvnet.jaxb2.maven2</groupId>
 +                <artifactId>maven-jaxb2-plugin</artifactId>
 +                <version>0.9.0</version>
 +                <executions>
 +                    <execution>
 +                        <id>configuration</id>
 +                        <goals>
 +                            <goal>generate</goal>
 +                        </goals>
 +                        <configuration>
 +                            <schemaDirectory>src/main/resources/schema</schemaDirectory>
 +                            <schemaIncludes>
 +                                <include>library.xsd</include>
 +                            </schemaIncludes>
 +                            <generatePackage>org.apache.asterix.external.library</generatePackage>
 +                            <generateDirectory>${project.build.directory}/generated-sources/configuration</generateDirectory>
 +                        </configuration>
 +                    </execution>
 +                </executions>
 +            </plugin>
 +         <plugin>
 +            <groupId>org.apache.maven.plugins</groupId>
 +            <artifactId>maven-jar-plugin</artifactId>
 +            <version>2.4</version>
 +            <configuration>
 +               <includes>
 +                  <include>**/*.class</include>
 +                  <include>**/*.txt</include>
 +                  <include>**/DISCLAIMER</include>
 +                  <include>**/NOTICE</include>
 +                  <include>**/LICENSE</include>
 +                  <include>**/DEPENDENCIES</include>
 +               </includes>
 +            </configuration>
 +            <executions>
 +               <execution>
 +                  <goals>
 +                     <goal>test-jar</goal>
 +                  </goals>
 +                  <phase>package</phase>
 +               </execution>
 +            </executions>
 +         </plugin>
 +         <plugin>
 +            <artifactId>maven-assembly-plugin</artifactId>
 +            <version>2.2-beta-5</version>
 +            <executions>
 +               <execution>
 +                  <configuration>
 +                     <descriptor>src/main/assembly/binary-assembly-libzip.xml</descriptor>
 +                     <finalName>testlib-zip</finalName>
 +                  </configuration>
 +                  <phase>package</phase>
 +                  <goals>
 +                     <goal>attached</goal>
 +                  </goals>
 +               </execution>
 +            </executions>
 +         </plugin>
 +        </plugins>
 +        <pluginManagement>
 +            <plugins>
 +                <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
 +                <plugin>
 +                    <groupId>org.eclipse.m2e</groupId>
 +                    <artifactId>lifecycle-mapping</artifactId>
 +                    <version>1.0.0</version>
 +                    <configuration>
 +                        <lifecycleMappingMetadata>
 +                            <pluginExecutions>
 +                                <pluginExecution>
 +                                    <pluginExecutionFilter>
 +                                        <groupId> org.apache.asterix</groupId>
 +                                        <artifactId> lexer-generator-maven-plugin</artifactId>
 +                                        <versionRange>[0.1,)</versionRange>
 +                                        <goals>
 +                                            <goal>generate-lexer</goal>
 +                                        </goals>
 +                                    </pluginExecutionFilter>
 +                                    <action>
 +                                        <execute>
 +                                            <runOnIncremental>false</runOnIncremental>
 +                                        </execute>
 +                                    </action>
 +                                </pluginExecution>
 +                                <pluginExecution>
 +                                    <pluginExecutionFilter>
 +                                        <groupId> org.codehaus.mojo</groupId>
 +                                        <artifactId>build-helper-maven-plugin</artifactId>
 +                                        <versionRange>[1.7,)</versionRange>
 +                                        <goals>
 +                                            <goal>add-source</goal>
 +                                        </goals>
 +                                    </pluginExecutionFilter>
 +                                    <action>
 +                                        <ignore />
 +                                    </action>
 +                                </pluginExecution>
 +                            </pluginExecutions>
 +                        </lifecycleMappingMetadata>
 +                    </configuration>
 +                </plugin>
 +            </plugins>
 +        </pluginManagement>
 +    </build>
 +    <dependencies>
 +        <dependency>
 +            <groupId>javax.servlet</groupId>
 +            <artifactId>servlet-api</artifactId>
 +            <type>jar</type>
 +        </dependency>
 +        <dependency>
 +            <groupId>org.apache.asterix</groupId>
 +            <artifactId>asterix-om</artifactId>
 +            <version>0.8.9-SNAPSHOT</version>
 +            <type>jar</type>
 +            <scope>compile</scope>
 +        </dependency>
 +        <dependency>
 +            <groupId>org.apache.asterix</groupId>
 +            <artifactId>asterix-runtime</artifactId>
 +            <version>0.8.9-SNAPSHOT</version>
 +            <type>jar</type>
 +            <scope>compile</scope>
 +        </dependency>
 +        <dependency>
 +            <groupId>org.apache.hyracks</groupId>
 +            <artifactId>algebricks-compiler</artifactId>
 +        </dependency>
 +        <dependency>
 +            <groupId>com.kenai.nbpwr</groupId>
 +            <artifactId>org-apache-commons-io</artifactId>
 +            <version>1.3.1-201002241208</version>
 +            <scope>test</scope>
 +        </dependency>
 +        <dependency>
 +            <groupId>org.twitter4j</groupId>
 +            <artifactId>twitter4j-core</artifactId>
-             <version>[4.0,)</version>
++            <version>4.0.3</version>
 +        </dependency>
 +        <dependency>
 +            <groupId>org.twitter4j</groupId>
 +            <artifactId>twitter4j-stream</artifactId>
-             <version>[4.0,)</version>
++            <version>4.0.3</version>
 +        </dependency>
 +        <dependency>
 +            <groupId>org.apache.hadoop</groupId>
 +            <artifactId>hadoop-client</artifactId>
 +            <type>jar</type>
 +            <scope>compile</scope>
 +        </dependency>
 +        <dependency>
 +            <groupId>org.apache.hadoop</groupId>
 +            <artifactId>hadoop-hdfs</artifactId>
 +        </dependency>
 +        <dependency>
 +            <groupId>net.java.dev.rome</groupId>
 +            <artifactId>rome-fetcher</artifactId>
 +            <version>1.0.0</version>
 +            <type>jar</type>
 +            <scope>compile</scope>
 +            <exclusions>
 +                <exclusion>
 +                    <artifactId>rome</artifactId>
 +                    <groupId>net.java.dev.rome</groupId>
 +                </exclusion>
 +            </exclusions>
 +        </dependency>
 +        <dependency>
 +            <groupId>rome</groupId>
 +            <artifactId>rome</artifactId>
 +            <version>1.0.1-modified-01</version>
 +        </dependency>
 +        <dependency>
 +            <groupId>org.apache.hyracks</groupId>
 +            <artifactId>hyracks-hdfs-core</artifactId>
 +            <version>${hyracks.version}</version>
 +        </dependency>
 +        <dependency>
 +            <groupId>jdom</groupId>
 +            <artifactId>jdom</artifactId>
 +            <version>1.0</version>
 +        </dependency>
 +        <dependency>
 +            <groupId>org.apache.asterix</groupId>
 +            <artifactId>asterix-common</artifactId>
 +            <version>0.8.9-SNAPSHOT</version>
 +        </dependency>
 +        <dependency>
 +            <groupId>com.microsoft.windowsazure</groupId>
 +            <artifactId>microsoft-windowsazure-api</artifactId>
 +            <version>0.4.4</version>
 +        </dependency>
 +        <dependency>
 +            <groupId>org.apache.hive</groupId>
 +            <artifactId>hive-exec</artifactId>
 +            <version>0.13.0</version>
 +        </dependency>
 +        <dependency>
 +            <groupId>javax.jdo</groupId>
 +            <artifactId>jdo2-api</artifactId>
 +            <version>2.3-20090302111651</version>
 +        </dependency>
 +        <dependency>
 +            <groupId>com.e-movimento.tinytools</groupId>
 +            <artifactId>privilegedaccessor</artifactId>
 +            <version>1.2.2</version>
 +            <scope>test</scope>
 +        </dependency>
 +        <dependency>
 +            <groupId>com.couchbase.client</groupId>
 +            <artifactId>core-io</artifactId>
 +            <version>1.2.3</version>
 +        </dependency>
 +        <dependency>
 +            <groupId>io.reactivex</groupId>
 +            <artifactId>rxjava</artifactId>
 +            <version>1.0.15</version>
 +        </dependency>
 +    </dependencies>
 +</project>

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
index a03ad1a,0000000..d3abd50
mode 100644,000000..100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
@@@ -1,162 -1,0 +1,165 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.asterix.external.adapter.factory;
 +
 +import java.util.List;
 +import java.util.Map;
 +
 +import org.apache.asterix.common.exceptions.AsterixException;
 +import org.apache.asterix.external.api.IAdapterFactory;
 +import org.apache.asterix.external.api.IDataFlowController;
 +import org.apache.asterix.external.api.IDataParserFactory;
 +import org.apache.asterix.external.api.IDataSourceAdapter;
 +import org.apache.asterix.external.api.IExternalDataSourceFactory;
 +import org.apache.asterix.external.api.IIndexibleExternalDataSource;
 +import org.apache.asterix.external.api.IIndexingAdapterFactory;
 +import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
 +import org.apache.asterix.external.dataset.adapter.FeedAdapter;
 +import org.apache.asterix.external.dataset.adapter.GenericAdapter;
 +import org.apache.asterix.external.indexing.ExternalFile;
 +import org.apache.asterix.external.provider.DataflowControllerProvider;
 +import org.apache.asterix.external.provider.DatasourceFactoryProvider;
 +import org.apache.asterix.external.provider.ParserFactoryProvider;
 +import org.apache.asterix.external.util.ExternalDataCompatibilityUtils;
 +import org.apache.asterix.external.util.ExternalDataConstants;
 +import org.apache.asterix.external.util.ExternalDataUtils;
++import org.apache.asterix.external.util.FeedLogManager;
 +import org.apache.asterix.external.util.FeedUtils;
 +import org.apache.asterix.om.types.ARecordType;
 +import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 +import org.apache.hyracks.api.context.IHyracksTaskContext;
 +import org.apache.hyracks.api.exceptions.HyracksDataException;
 +import org.apache.hyracks.dataflow.std.file.FileSplit;
 +
 +public class GenericAdapterFactory implements IIndexingAdapterFactory, IAdapterFactory {
 +
 +    private static final long serialVersionUID = 1L;
 +    private IExternalDataSourceFactory dataSourceFactory;
 +    private IDataParserFactory dataParserFactory;
 +    private ARecordType recordType;
 +    private Map<String, String> configuration;
 +    private List<ExternalFile> files;
 +    private boolean indexingOp;
 +    private boolean isFeed;
 +    private FileSplit[] feedLogFileSplits;
 +    private ARecordType metaType;
++    private FeedLogManager feedLogManager = null;
 +
 +    @Override
 +    public void setSnapshot(List<ExternalFile> files, boolean indexingOp) {
 +        this.files = files;
 +        this.indexingOp = indexingOp;
 +    }
 +
 +    @Override
 +    public String getAlias() {
 +        return ExternalDataConstants.ALIAS_GENERIC_ADAPTER;
 +    }
 +
 +    @Override
 +    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AsterixException {
 +        return dataSourceFactory.getPartitionConstraint();
 +    }
 +
 +    /**
 +     * Runs on each node controller (after serialization-deserialization)
 +     */
 +    @Override
 +    public synchronized IDataSourceAdapter createAdapter(IHyracksTaskContext ctx, int partition)
 +            throws HyracksDataException {
 +        try {
 +            restoreExternalObjects();
 +        } catch (AsterixException e) {
 +            throw new HyracksDataException(e);
 +        }
++        if (isFeed) {
++            if (feedLogManager == null) {
++                feedLogManager = FeedUtils.getFeedLogManager(ctx, partition, feedLogFileSplits);
++            }
++            feedLogManager.touch();
++        }
 +        IDataFlowController controller = DataflowControllerProvider.getDataflowController(recordType, ctx, partition,
-                 dataSourceFactory, dataParserFactory, configuration, indexingOp, isFeed, feedLogFileSplits);
++                dataSourceFactory, dataParserFactory, configuration, indexingOp, isFeed, feedLogManager);
 +        if (isFeed) {
 +            return new FeedAdapter((AbstractFeedDataFlowController) controller);
 +        } else {
 +            return new GenericAdapter(controller);
 +        }
 +    }
 +
 +    private void restoreExternalObjects() throws AsterixException {
 +        if (dataSourceFactory == null) {
 +            dataSourceFactory = DatasourceFactoryProvider.getExternalDataSourceFactory(configuration);
 +            // create and configure parser factory
 +            if (dataSourceFactory.isIndexible() && (files != null)) {
 +                ((IIndexibleExternalDataSource) dataSourceFactory).setSnapshot(files, indexingOp);
 +            }
 +            dataSourceFactory.configure(configuration);
 +        }
 +        if (dataParserFactory == null) {
 +            // create and configure parser factory
 +            dataParserFactory = ParserFactoryProvider.getDataParserFactory(configuration);
 +            dataParserFactory.setRecordType(recordType);
 +            dataParserFactory.setMetaType(metaType);
 +            dataParserFactory.configure(configuration);
 +        }
 +    }
 +
 +    @Override
 +    public void configure(Map<String, String> configuration, ARecordType outputType, ARecordType metaType)
 +            throws AsterixException {
 +        this.recordType = outputType;
 +        this.metaType = metaType;
 +        this.configuration = configuration;
 +        dataSourceFactory = DatasourceFactoryProvider.getExternalDataSourceFactory(configuration);
- 
 +        dataParserFactory = ParserFactoryProvider.getDataParserFactory(configuration);
-         prepare();
++        if (dataSourceFactory.isIndexible() && (files != null)) {
++            ((IIndexibleExternalDataSource) dataSourceFactory).setSnapshot(files, indexingOp);
++        }
++        dataSourceFactory.configure(configuration);
++        dataParserFactory.setRecordType(recordType);
++        dataParserFactory.setMetaType(metaType);
++        dataParserFactory.configure(configuration);
 +        ExternalDataCompatibilityUtils.validateCompatibility(dataSourceFactory, dataParserFactory);
 +        configureFeedLogManager();
 +        nullifyExternalObjects();
 +    }
 +
 +    private void configureFeedLogManager() throws AsterixException {
 +        this.isFeed = ExternalDataUtils.isFeed(configuration);
 +        if (isFeed) {
 +            feedLogFileSplits = FeedUtils.splitsForAdapter(ExternalDataUtils.getDataverse(configuration),
 +                    ExternalDataUtils.getFeedName(configuration), dataSourceFactory.getPartitionConstraint());
 +        }
 +    }
 +
 +    private void nullifyExternalObjects() {
 +        if (ExternalDataUtils.isExternal(configuration.get(ExternalDataConstants.KEY_READER))) {
 +            dataSourceFactory = null;
 +        }
 +        if (ExternalDataUtils.isExternal(configuration.get(ExternalDataConstants.KEY_PARSER))) {
 +            dataParserFactory = null;
 +        }
 +    }
 +
-     private void prepare() throws AsterixException {
-         if (dataSourceFactory.isIndexible() && (files != null)) {
-             ((IIndexibleExternalDataSource) dataSourceFactory).setSnapshot(files, indexingOp);
-         }
-         dataSourceFactory.configure(configuration);
-         dataParserFactory.setRecordType(recordType);
-         dataParserFactory.setMetaType(metaType);
-         dataParserFactory.configure(configuration);
-     }
- 
 +    @Override
 +    public ARecordType getAdapterOutputType() {
 +        return recordType;
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/AsterixInputStream.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/AsterixInputStream.java
index 83d7a3a,0000000..a4c2fae
mode 100644,000000..100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/AsterixInputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/AsterixInputStream.java
@@@ -1,44 -1,0 +1,50 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.asterix.external.api;
 +
 +import java.io.InputStream;
 +
 +import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
 +import org.apache.asterix.external.util.FeedLogManager;
++import org.apache.hyracks.api.exceptions.HyracksDataException;
 +
 +public abstract class AsterixInputStream extends InputStream {
 +
 +    protected AbstractFeedDataFlowController controller;
 +    protected FeedLogManager logManager;
++    protected IStreamNotificationHandler notificationHandler;
 +
 +    public abstract boolean stop() throws Exception;
 +
 +    public abstract boolean handleException(Throwable th);
 +
 +    // TODO: Find a better way to send notifications
 +    public void setController(AbstractFeedDataFlowController controller) {
 +        this.controller = controller;
 +    }
 +
 +    // TODO: Find a better way to send notifications
-     public void setFeedLogManager(FeedLogManager logManager) {
++    public void setFeedLogManager(FeedLogManager logManager) throws HyracksDataException {
 +        this.logManager = logManager;
 +    }
++
++    public void setNotificationHandler(IStreamNotificationHandler notificationHandler) {
++        this.notificationHandler = notificationHandler;
++    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
index 11e2472,0000000..9cce1c9
mode 100644,000000..100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
@@@ -1,71 -1,0 +1,76 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.asterix.external.api;
 +
 +import java.io.Closeable;
 +import java.io.IOException;
 +
 +import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
 +import org.apache.asterix.external.util.FeedLogManager;
++import org.apache.hyracks.api.exceptions.HyracksDataException;
 +
 +/**
 + * This interface represents a record reader that reads data from external source as a set of records
++ *
 + * @param <T>
 + */
 +public interface IRecordReader<T> extends Closeable {
 +
 +    /**
 +     * @return true if the reader has more records remaining, false, otherwise.
 +     * @throws Exception
-      *         if an error takes place
++     *             if an error takes place
 +     */
 +    public boolean hasNext() throws Exception;
 +
 +    /**
 +     * @return the object representing the next record.
 +     * @throws IOException
 +     * @throws InterruptedException
 +     */
 +    public IRawRecord<T> next() throws IOException, InterruptedException;
 +
 +    /**
 +     * used to stop reader from producing more records.
++     *
 +     * @return true if the connection to the external source has been suspended, false otherwise.
 +     */
 +    public boolean stop();
 +
 +    // TODO: Find a better way to do flushes, this doesn't fit here
 +    /**
 +     * set a pointer to the controller of the feed. the controller can be used to flush()
 +     * parsed records when waiting for more records to be pushed
 +     */
 +    public void setController(AbstractFeedDataFlowController controller);
 +
 +    // TODO: Find a better way to perform logging. this doesn't fit here
 +    /**
 +     * set a pointer to the log manager of the feed. the log manager can be used to log
 +     * progress and errors
++     *
++     * @throws HyracksDataException
 +     */
-     public void setFeedLogManager(FeedLogManager feedLogManager);
++    public void setFeedLogManager(FeedLogManager feedLogManager) throws HyracksDataException;
 +
 +    /**
 +     * gives the record reader a chance to recover from IO errors during feed intake
 +     */
 +    public boolean handleException(Throwable th);
 +}