You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by im...@apache.org on 2016/04/07 17:00:09 UTC

[34/50] [abbrv] incubator-asterixdb git commit: Merge branch 'master' into hyracks-merge2

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index c67eb70,0000000..cc50b75
mode 100644,000000..100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@@ -1,211 -1,0 +1,232 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.asterix.api.common;
 +
 +import java.io.File;
++import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.EnumSet;
 +import java.util.List;
 +
 +import org.apache.asterix.common.config.AsterixPropertiesAccessor;
 +import org.apache.asterix.common.config.GlobalConfig;
 +import org.apache.asterix.hyracks.bootstrap.CCApplicationEntryPoint;
 +import org.apache.asterix.hyracks.bootstrap.NCApplicationEntryPoint;
 +import org.apache.commons.io.FileUtils;
 +import org.apache.hyracks.api.client.HyracksConnection;
 +import org.apache.hyracks.api.client.IHyracksClientConnection;
 +import org.apache.hyracks.api.job.JobFlag;
 +import org.apache.hyracks.api.job.JobId;
 +import org.apache.hyracks.api.job.JobSpecification;
 +import org.apache.hyracks.control.cc.ClusterControllerService;
 +import org.apache.hyracks.control.common.controllers.CCConfig;
 +import org.apache.hyracks.control.common.controllers.NCConfig;
 +import org.apache.hyracks.control.nc.NodeControllerService;
 +
 +public class AsterixHyracksIntegrationUtil {
 +
 +    private static final String IO_DIR_KEY = "java.io.tmpdir";
 +    public static final int DEFAULT_HYRACKS_CC_CLIENT_PORT = 1098;
 +    public static final int DEFAULT_HYRACKS_CC_CLUSTER_PORT = 1099;
 +
 +    public static ClusterControllerService cc;
 +    public static NodeControllerService[] ncs;
 +    public static IHyracksClientConnection hcc;
 +
 +    private static AsterixPropertiesAccessor propertiesAccessor;
 +
 +    public static void init(boolean deleteOldInstanceData) throws Exception {
 +        propertiesAccessor = new AsterixPropertiesAccessor();
 +        ncs = new NodeControllerService[propertiesAccessor.getNodeNames().size()];
 +        if (deleteOldInstanceData) {
 +            deleteTransactionLogs();
 +            removeTestStorageFiles();
 +        }
 +
 +        CCConfig ccConfig = new CCConfig();
 +        ccConfig.clusterNetIpAddress = "127.0.0.1";
 +        ccConfig.clientNetIpAddress = "127.0.0.1";
 +        ccConfig.clientNetPort = DEFAULT_HYRACKS_CC_CLIENT_PORT;
 +        ccConfig.clusterNetPort = DEFAULT_HYRACKS_CC_CLUSTER_PORT;
 +        ccConfig.defaultMaxJobAttempts = 0;
 +        ccConfig.resultTTL = 30000;
 +        ccConfig.resultSweepThreshold = 1000;
 +        ccConfig.appCCMainClass = CCApplicationEntryPoint.class.getName();
 +        // ccConfig.useJOL = true;
 +        cc = new ClusterControllerService(ccConfig);
 +        cc.start();
 +
 +        // Starts ncs.
 +        int n = 0;
 +        List<String> nodes = propertiesAccessor.getNodeNames();
 +        for (String ncName : nodes) {
 +            NCConfig ncConfig1 = new NCConfig();
 +            ncConfig1.ccHost = "localhost";
 +            ncConfig1.ccPort = DEFAULT_HYRACKS_CC_CLUSTER_PORT;
 +            ncConfig1.clusterNetIPAddress = "127.0.0.1";
 +            ncConfig1.dataIPAddress = "127.0.0.1";
 +            ncConfig1.resultIPAddress = "127.0.0.1";
 +            ncConfig1.nodeId = ncName;
 +            ncConfig1.resultTTL = 30000;
 +            ncConfig1.resultSweepThreshold = 1000;
 +            ncConfig1.appArgs = Arrays.asList("-virtual-NC");
 +            String tempPath = System.getProperty(IO_DIR_KEY);
 +            if (tempPath.endsWith(File.separator)) {
 +                tempPath = tempPath.substring(0, tempPath.length() - 1);
 +            }
 +            System.err.println("Using the path: " + tempPath);
 +            // get initial partitions from properties
 +            String[] nodeStores = propertiesAccessor.getStores().get(ncName);
 +            if (nodeStores == null) {
 +                throw new Exception("Coudn't find stores for NC: " + ncName);
 +            }
 +            String tempDirPath = System.getProperty(IO_DIR_KEY);
 +            if (!tempDirPath.endsWith(File.separator)) {
 +                tempDirPath += File.separator;
 +            }
 +            for (int p = 0; p < nodeStores.length; p++) {
 +                // create IO devices based on stores
 +                String iodevicePath = tempDirPath + ncConfig1.nodeId + File.separator + nodeStores[p];
 +                File ioDeviceDir = new File(iodevicePath);
 +                ioDeviceDir.mkdirs();
 +                if (p == 0) {
 +                    ncConfig1.ioDevices = iodevicePath;
 +                } else {
 +                    ncConfig1.ioDevices += "," + iodevicePath;
 +                }
 +            }
 +            ncConfig1.appNCMainClass = NCApplicationEntryPoint.class.getName();
 +            NodeControllerService nodeControllerService = new NodeControllerService(ncConfig1);
 +            ncs[n] = nodeControllerService;
 +            Thread ncStartThread = new Thread() {
 +                @Override
 +                public void run() {
 +                    try {
 +                        nodeControllerService.start();
 +                    } catch (Exception e) {
 +                        e.printStackTrace();
 +                    }
 +                }
 +            };
 +            ncStartThread.start();
 +            ++n;
 +        }
 +        hcc = new HyracksConnection(cc.getConfig().clientNetIpAddress, cc.getConfig().clientNetPort);
 +    }
 +
 +    public static String[] getNcNames() {
 +        return propertiesAccessor.getNodeNames().toArray(new String[propertiesAccessor.getNodeNames().size()]);
 +    }
 +
 +    public static IHyracksClientConnection getHyracksClientConnection() {
 +        return hcc;
 +    }
 +
 +    public static void deinit(boolean deleteOldInstanceData) throws Exception {
++        //stop NCs
++        ArrayList<Thread> stopNCThreads = new ArrayList<>();
 +        for (int n = 0; n < ncs.length; ++n) {
-             if (ncs[n] != null)
-                 ncs[n].stop();
++            NodeControllerService nodeControllerService = ncs[n];
++            if (nodeControllerService != null) {
++                Thread ncStopThread = new Thread() {
++                    @Override
++                    public void run() {
++                        try {
++                            nodeControllerService.stop();
++                        } catch (Exception e) {
++                            e.printStackTrace();
++                        }
++                    }
++                };
++                stopNCThreads.add(ncStopThread);
++                ncStopThread.start();
++            }
++        }
 +
++        //make sure all NCs stopped
++        for (Thread stopNcTheard : stopNCThreads) {
++            stopNcTheard.join();
 +        }
++
 +        if (cc != null) {
 +            cc.stop();
 +        }
 +
 +        if (deleteOldInstanceData) {
 +            deleteTransactionLogs();
 +            removeTestStorageFiles();
 +        }
 +    }
 +
 +    public static void runJob(JobSpecification spec) throws Exception {
 +        GlobalConfig.ASTERIX_LOGGER.info(spec.toJSON().toString());
 +        JobId jobId = hcc.startJob(spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
 +        GlobalConfig.ASTERIX_LOGGER.info(jobId.toString());
 +        hcc.waitForCompletion(jobId);
 +    }
 +
 +    public static void removeTestStorageFiles() {
 +        File dir = new File(System.getProperty(IO_DIR_KEY));
 +        for (String ncName : propertiesAccessor.getNodeNames()) {
 +            File ncDir = new File(dir, ncName);
 +            FileUtils.deleteQuietly(ncDir);
 +        }
 +    }
 +
 +    private static void deleteTransactionLogs() throws Exception {
 +        for (String ncId : propertiesAccessor.getNodeNames()) {
 +            File log = new File(propertiesAccessor.getTransactionLogDirs().get(ncId));
 +            if (log.exists()) {
 +                FileUtils.deleteDirectory(log);
 +            }
 +        }
 +    }
 +
 +    /**
 +     * main method to run a simple 2 node cluster in-process
 +     * suggested VM arguments: <code>-enableassertions -Xmx2048m -Dfile.encoding=UTF-8</code>
 +     *
 +     * @param args
 +     *            unused
 +     */
 +    public static void main(String[] args) {
 +        Runtime.getRuntime().addShutdownHook(new Thread() {
 +            @Override
 +            public void run() {
 +                try {
 +                    deinit(false);
 +                } catch (Exception e) {
 +                    e.printStackTrace();
 +                }
 +            }
 +        });
 +        try {
 +            System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, "asterix-build-configuration.xml");
 +
 +            init(false);
 +            while (true) {
 +                Thread.sleep(10000);
 +            }
 +        } catch (Exception e) {
 +            e.printStackTrace();
 +            System.exit(1);
 +        }
 +    }
 +
 +}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java
index 5cd490a,0000000..d8f1893
mode 100644,000000..100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java
@@@ -1,273 -1,0 +1,273 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.asterix.app.external;
 +
 +import java.util.Collection;
 +import java.util.List;
++import java.util.Set;
++import java.util.TreeSet;
 +
 +import org.apache.asterix.common.exceptions.AsterixException;
 +import org.apache.asterix.common.utils.StoragePathUtil;
 +import org.apache.asterix.external.api.IAdapterFactory;
 +import org.apache.asterix.external.feed.api.IFeedJoint;
 +import org.apache.asterix.external.feed.api.IFeedMessage;
 +import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
 +import org.apache.asterix.external.feed.management.FeedConnectionId;
 +import org.apache.asterix.external.feed.management.FeedId;
 +import org.apache.asterix.external.feed.message.EndFeedMessage;
 +import org.apache.asterix.external.feed.message.FeedTupleCommitResponseMessage;
 +import org.apache.asterix.external.feed.message.PrepareStallMessage;
 +import org.apache.asterix.external.feed.message.TerminateDataFlowMessage;
 +import org.apache.asterix.external.feed.message.ThrottlingEnabledFeedMessage;
 +import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
 +import org.apache.asterix.external.feed.watch.FeedConnectJobInfo;
 +import org.apache.asterix.external.operators.FeedMessageOperatorDescriptor;
 +import org.apache.asterix.external.util.FeedConstants;
 +import org.apache.asterix.external.util.FeedUtils;
 +import org.apache.asterix.file.JobSpecificationUtils;
 +import org.apache.asterix.metadata.declared.AqlMetadataProvider;
 +import org.apache.asterix.metadata.entities.Feed;
 +import org.apache.asterix.om.util.AsterixClusterProperties;
 +import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 +import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 +import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
 +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 +import org.apache.hyracks.algebricks.common.utils.Pair;
 +import org.apache.hyracks.algebricks.common.utils.Triple;
 +import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 +import org.apache.hyracks.api.job.JobSpecification;
 +import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
 +import org.apache.hyracks.dataflow.std.file.FileRemoveOperatorDescriptor;
 +import org.apache.hyracks.dataflow.std.file.FileSplit;
 +import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
 +import org.apache.hyracks.dataflow.std.misc.NullSinkOperatorDescriptor;
 +
 +/**
 + * Provides helper method(s) for creating JobSpec for operations on a feed.
 + */
 +public class FeedOperations {
 +
 +    /**
 +     * Builds the job spec for ingesting a (primary) feed from its external source via the feed adaptor.
++     *
 +     * @param primaryFeed
 +     * @param metadataProvider
 +     * @return JobSpecification the Hyracks job specification for receiving data from external source
 +     * @throws Exception
 +     */
 +    public static Pair<JobSpecification, IAdapterFactory> buildFeedIntakeJobSpec(Feed primaryFeed,
 +            AqlMetadataProvider metadataProvider, FeedPolicyAccessor policyAccessor) throws Exception {
- 
 +        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
 +        spec.setFrameSize(FeedConstants.JobConstants.DEFAULT_FRAME_SIZE);
 +        IAdapterFactory adapterFactory = null;
 +        IOperatorDescriptor feedIngestor;
 +        AlgebricksPartitionConstraint ingesterPc;
- 
-         try {
-             Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, IAdapterFactory> t = metadataProvider
-                     .buildFeedIntakeRuntime(spec, primaryFeed, policyAccessor);
-             feedIngestor = t.first;
-             ingesterPc = t.second;
-             adapterFactory = t.third;
-         } catch (AlgebricksException e) {
-             e.printStackTrace();
-             throw new AsterixException(e);
-         }
- 
++        Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, IAdapterFactory> t = metadataProvider
++                .buildFeedIntakeRuntime(spec, primaryFeed, policyAccessor);
++        feedIngestor = t.first;
++        ingesterPc = t.second;
++        adapterFactory = t.third;
 +        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, feedIngestor, ingesterPc);
- 
 +        NullSinkOperatorDescriptor nullSink = new NullSinkOperatorDescriptor(spec);
 +        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, nullSink, ingesterPc);
 +        spec.connect(new OneToOneConnectorDescriptor(spec), feedIngestor, 0, nullSink, 0);
 +        spec.addRoot(nullSink);
 +        return new Pair<JobSpecification, IAdapterFactory>(spec, adapterFactory);
 +    }
 +
 +    public static JobSpecification buildDiscontinueFeedSourceSpec(AqlMetadataProvider metadataProvider, FeedId feedId)
 +            throws AsterixException, AlgebricksException {
 +
 +        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
 +        IOperatorDescriptor feedMessenger = null;
 +        AlgebricksPartitionConstraint messengerPc = null;
 +
 +        List<String> locations = FeedLifecycleListener.INSTANCE.getIntakeLocations(feedId);
 +        Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p = buildDiscontinueFeedMessengerRuntime(spec, feedId,
 +                locations);
 +
 +        feedMessenger = p.first;
 +        messengerPc = p.second;
 +
 +        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, feedMessenger, messengerPc);
 +        NullSinkOperatorDescriptor nullSink = new NullSinkOperatorDescriptor(spec);
 +        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, nullSink, messengerPc);
 +        spec.connect(new OneToOneConnectorDescriptor(spec), feedMessenger, 0, nullSink, 0);
 +        spec.addRoot(nullSink);
 +
 +        return spec;
 +    }
 +
 +    /**
 +     * Builds the job spec for sending message to an active feed to disconnect it from the
 +     * its source.
 +     */
 +    public static Pair<JobSpecification, Boolean> buildDisconnectFeedJobSpec(AqlMetadataProvider metadataProvider,
 +            FeedConnectionId connectionId) throws AsterixException, AlgebricksException {
 +
 +        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
 +        IOperatorDescriptor feedMessenger;
 +        AlgebricksPartitionConstraint messengerPc;
 +        List<String> locations = null;
 +        FeedRuntimeType sourceRuntimeType;
 +        try {
 +            FeedConnectJobInfo cInfo = FeedLifecycleListener.INSTANCE.getFeedConnectJobInfo(connectionId);
 +            IFeedJoint sourceFeedJoint = cInfo.getSourceFeedJoint();
 +            IFeedJoint computeFeedJoint = cInfo.getComputeFeedJoint();
 +
 +            boolean terminateIntakeJob = false;
 +            boolean completeDisconnect = computeFeedJoint == null || computeFeedJoint.getReceivers().isEmpty();
 +            if (completeDisconnect) {
 +                sourceRuntimeType = FeedRuntimeType.INTAKE;
 +                locations = cInfo.getCollectLocations();
 +                terminateIntakeJob = sourceFeedJoint.getReceivers().size() == 1;
 +            } else {
 +                locations = cInfo.getComputeLocations();
 +                sourceRuntimeType = FeedRuntimeType.COMPUTE;
 +            }
 +
 +            Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p = buildDisconnectFeedMessengerRuntime(spec,
 +                    connectionId, locations, sourceRuntimeType, completeDisconnect, sourceFeedJoint.getOwnerFeedId());
 +
 +            feedMessenger = p.first;
 +            messengerPc = p.second;
 +
 +            AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, feedMessenger, messengerPc);
 +            NullSinkOperatorDescriptor nullSink = new NullSinkOperatorDescriptor(spec);
 +            AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, nullSink, messengerPc);
 +            spec.connect(new OneToOneConnectorDescriptor(spec), feedMessenger, 0, nullSink, 0);
 +            spec.addRoot(nullSink);
 +            return new Pair<JobSpecification, Boolean>(spec, terminateIntakeJob);
 +
 +        } catch (AlgebricksException e) {
 +            throw new AsterixException(e);
 +        }
 +
 +    }
 +
 +    public static JobSpecification buildPrepareStallMessageJob(PrepareStallMessage stallMessage,
 +            Collection<String> collectLocations) throws AsterixException {
 +        JobSpecification messageJobSpec = JobSpecificationUtils.createJobSpecification();
 +        try {
 +            Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p = FeedOperations.buildSendFeedMessageRuntime(
 +                    messageJobSpec, stallMessage.getConnectionId(), stallMessage, collectLocations);
 +            buildSendFeedMessageJobSpec(p.first, p.second, messageJobSpec);
 +        } catch (AlgebricksException ae) {
 +            throw new AsterixException(ae);
 +        }
 +        return messageJobSpec;
 +    }
 +
 +    public static JobSpecification buildNotifyThrottlingEnabledMessageJob(
 +            ThrottlingEnabledFeedMessage throttlingEnabledMesg, Collection<String> locations) throws AsterixException {
 +        JobSpecification messageJobSpec = JobSpecificationUtils.createJobSpecification();
 +        try {
 +            Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p = FeedOperations.buildSendFeedMessageRuntime(
 +                    messageJobSpec, throttlingEnabledMesg.getConnectionId(), throttlingEnabledMesg, locations);
 +            buildSendFeedMessageJobSpec(p.first, p.second, messageJobSpec);
 +        } catch (AlgebricksException ae) {
 +            throw new AsterixException(ae);
 +        }
 +        return messageJobSpec;
 +    }
 +
 +    public static JobSpecification buildTerminateFlowMessageJob(TerminateDataFlowMessage terminateMessage,
 +            List<String> collectLocations) throws AsterixException {
 +        JobSpecification messageJobSpec = JobSpecificationUtils.createJobSpecification();
 +        try {
 +            Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p = FeedOperations.buildSendFeedMessageRuntime(
 +                    messageJobSpec, terminateMessage.getConnectionId(), terminateMessage, collectLocations);
 +            buildSendFeedMessageJobSpec(p.first, p.second, messageJobSpec);
 +        } catch (AlgebricksException ae) {
 +            throw new AsterixException(ae);
 +        }
 +        return messageJobSpec;
 +    }
 +
 +    public static JobSpecification buildCommitAckResponseJob(FeedTupleCommitResponseMessage commitResponseMessage,
 +            Collection<String> targetLocations) throws AsterixException {
 +        JobSpecification messageJobSpec = JobSpecificationUtils.createJobSpecification();
 +        try {
 +            Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p = FeedOperations.buildSendFeedMessageRuntime(
 +                    messageJobSpec, commitResponseMessage.getConnectionId(), commitResponseMessage, targetLocations);
 +            buildSendFeedMessageJobSpec(p.first, p.second, messageJobSpec);
 +        } catch (AlgebricksException ae) {
 +            throw new AsterixException(ae);
 +        }
 +        return messageJobSpec;
 +    }
 +
 +    public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildDiscontinueFeedMessengerRuntime(
 +            JobSpecification jobSpec, FeedId feedId, List<String> locations) throws AlgebricksException {
 +        FeedConnectionId feedConnectionId = new FeedConnectionId(feedId, null);
 +        IFeedMessage feedMessage = new EndFeedMessage(feedConnectionId, FeedRuntimeType.INTAKE,
 +                feedConnectionId.getFeedId(), true, EndFeedMessage.EndMessageType.DISCONTINUE_SOURCE);
 +        return buildSendFeedMessageRuntime(jobSpec, feedConnectionId, feedMessage, locations);
 +    }
 +
 +    private static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildSendFeedMessageRuntime(
 +            JobSpecification jobSpec, FeedConnectionId feedConenctionId, IFeedMessage feedMessage,
 +            Collection<String> locations) throws AlgebricksException {
 +        AlgebricksPartitionConstraint partitionConstraint = new AlgebricksAbsolutePartitionConstraint(
 +                locations.toArray(new String[] {}));
 +        FeedMessageOperatorDescriptor feedMessenger = new FeedMessageOperatorDescriptor(jobSpec, feedConenctionId,
 +                feedMessage);
 +        return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(feedMessenger, partitionConstraint);
 +    }
 +
 +    private static JobSpecification buildSendFeedMessageJobSpec(IOperatorDescriptor operatorDescriptor,
 +            AlgebricksPartitionConstraint messengerPc, JobSpecification messageJobSpec) {
 +        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(messageJobSpec, operatorDescriptor,
 +                messengerPc);
 +        NullSinkOperatorDescriptor nullSink = new NullSinkOperatorDescriptor(messageJobSpec);
 +        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(messageJobSpec, nullSink, messengerPc);
 +        messageJobSpec.connect(new OneToOneConnectorDescriptor(messageJobSpec), operatorDescriptor, 0, nullSink, 0);
 +        messageJobSpec.addRoot(nullSink);
 +        return messageJobSpec;
 +    }
 +
 +    private static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildDisconnectFeedMessengerRuntime(
 +            JobSpecification jobSpec, FeedConnectionId feedConenctionId, List<String> locations,
 +            FeedRuntimeType sourceFeedRuntimeType, boolean completeDisconnection, FeedId sourceFeedId)
-                     throws AlgebricksException {
++            throws AlgebricksException {
 +        IFeedMessage feedMessage = new EndFeedMessage(feedConenctionId, sourceFeedRuntimeType, sourceFeedId,
 +                completeDisconnection, EndFeedMessage.EndMessageType.DISCONNECT_FEED);
 +        return buildSendFeedMessageRuntime(jobSpec, feedConenctionId, feedMessage, locations);
 +    }
 +
 +    public static JobSpecification buildRemoveFeedStorageJob(Feed feed) throws Exception {
 +        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
-         AlgebricksAbsolutePartitionConstraint locations = AsterixClusterProperties.INSTANCE.getClusterLocations();
++        AlgebricksAbsolutePartitionConstraint allCluster = AsterixClusterProperties.INSTANCE.getClusterLocations();
++        Set<String> nodes = new TreeSet<>();
++        for (String node : allCluster.getLocations()) {
++            nodes.add(node);
++        }
++        AlgebricksAbsolutePartitionConstraint locations = new AlgebricksAbsolutePartitionConstraint(
++                nodes.toArray(new String[nodes.size()]));
 +        FileSplit[] feedLogFileSplits = FeedUtils.splitsForAdapter(feed.getDataverseName(), feed.getFeedName(),
 +                locations);
 +        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = StoragePathUtil
 +                .splitProviderAndPartitionConstraints(feedLogFileSplits);
-         FileRemoveOperatorDescriptor frod = new FileRemoveOperatorDescriptor(spec, splitsAndConstraint.first);
++        FileRemoveOperatorDescriptor frod = new FileRemoveOperatorDescriptor(spec, splitsAndConstraint.first, true);
 +        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, frod, splitsAndConstraint.second);
 +        spec.addRoot(frod);
 +        return spec;
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/DatasetOperations.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-app/src/main/java/org/apache/asterix/file/DatasetOperations.java
index 06d2b71,0000000..9052696
mode 100644,000000..100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/DatasetOperations.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/DatasetOperations.java
@@@ -1,265 -1,0 +1,261 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +
 +package org.apache.asterix.file;
 +
 +import java.io.File;
 +import java.rmi.RemoteException;
 +import java.util.Map;
 +import java.util.logging.Logger;
 +
 +import org.apache.asterix.common.api.ILocalResourceMetadata;
 +import org.apache.asterix.common.config.AsterixStorageProperties;
 +import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 +import org.apache.asterix.common.context.AsterixVirtualBufferCacheProvider;
 +import org.apache.asterix.common.exceptions.ACIDException;
 +import org.apache.asterix.common.exceptions.AsterixException;
 +import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
 +import org.apache.asterix.formats.base.IDataFormat;
 +import org.apache.asterix.metadata.MetadataManager;
 +import org.apache.asterix.metadata.declared.AqlMetadataProvider;
 +import org.apache.asterix.metadata.entities.Dataset;
 +import org.apache.asterix.metadata.entities.Dataverse;
 +import org.apache.asterix.metadata.utils.DatasetUtils;
 +import org.apache.asterix.om.types.ARecordType;
 +import org.apache.asterix.om.util.AsterixAppContextInfo;
 +import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerProvider;
 +import org.apache.asterix.transaction.management.resource.LSMBTreeLocalResourceMetadata;
 +import org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
 +import org.apache.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
 +import org.apache.asterix.translator.CompiledStatements.CompiledDatasetDropStatement;
 +import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 +import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
 +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 +import org.apache.hyracks.algebricks.common.utils.Pair;
 +import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 +import org.apache.hyracks.api.dataflow.value.ITypeTraits;
 +import org.apache.hyracks.api.exceptions.HyracksDataException;
 +import org.apache.hyracks.api.job.JobSpecification;
 +import org.apache.hyracks.dataflow.std.file.FileSplit;
 +import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
 +import org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor;
 +import org.apache.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor;
 +import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
 +import org.apache.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory;
 +import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 +import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMTreeIndexCompactOperatorDescriptor;
 +import org.apache.hyracks.storage.common.file.ILocalResourceFactoryProvider;
 +import org.apache.hyracks.storage.common.file.LocalResource;
 +
 +public class DatasetOperations {
 +
 +    private static Logger LOGGER = Logger.getLogger(DatasetOperations.class.getName());
 +
 +    public static JobSpecification createDropDatasetJobSpec(CompiledDatasetDropStatement datasetDropStmt,
 +            AqlMetadataProvider metadataProvider)
-                     throws AlgebricksException, HyracksDataException, RemoteException, ACIDException, AsterixException {
++            throws AlgebricksException, HyracksDataException, RemoteException, ACIDException, AsterixException {
 +
 +        String dataverseName = null;
 +        if (datasetDropStmt.getDataverseName() != null) {
 +            dataverseName = datasetDropStmt.getDataverseName();
 +        } else if (metadataProvider.getDefaultDataverse() != null) {
 +            dataverseName = metadataProvider.getDefaultDataverse().getDataverseName();
 +        }
 +
 +        String datasetName = datasetDropStmt.getDatasetName();
 +        String datasetPath = dataverseName + File.separator + datasetName;
 +
 +        LOGGER.info("DROP DATASETPATH: " + datasetPath);
 +
 +        Dataset dataset = metadataProvider.findDataset(dataverseName, datasetName);
 +        if (dataset == null) {
 +            throw new AlgebricksException("DROP DATASET: No metadata for dataset " + datasetName);
 +        }
 +        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
 +            return JobSpecificationUtils.createJobSpecification();
 +        }
 +        boolean temp = dataset.getDatasetDetails().isTemp();
 +
 +        Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(),
 +                dataverseName);
 +        IDataFormat format;
 +        try {
 +            format = (IDataFormat) Class.forName(dataverse.getDataFormat()).newInstance();
 +        } catch (Exception e) {
 +            throw new AsterixException(e);
 +        }
 +
 +        ARecordType itemType = (ARecordType) metadataProvider.findType(dataset.getItemTypeDataverseName(),
 +                dataset.getItemTypeName());
 +
 +        ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, itemType);
 +        IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
 +                itemType, format.getBinaryComparatorFactoryProvider());
 +        int[] filterFields = DatasetUtils.createFilterFields(dataset);
 +        int[] btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset);
 +        JobSpecification specPrimary = JobSpecificationUtils.createJobSpecification();
 +
 +        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadataProvider
 +                .splitProviderAndPartitionConstraintsForDataset(dataset.getDataverseName(), datasetName, datasetName,
 +                        temp);
 +        AsterixStorageProperties storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
 +        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(dataset,
 +                metadataProvider.getMetadataTxnContext());
 +
 +        IndexDropOperatorDescriptor primaryBtreeDrop = new IndexDropOperatorDescriptor(specPrimary,
 +                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
 +                splitsAndConstraint.first,
 +                new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
 +                        compactionInfo.first, compactionInfo.second,
 +                        new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
 +                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
 +                        storageProperties.getBloomFilterFalsePositiveRate(), true, filterTypeTraits, filterCmpFactories,
 +                        btreeFields, filterFields, !temp));
 +        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(specPrimary, primaryBtreeDrop,
 +                splitsAndConstraint.second);
 +
 +        specPrimary.addRoot(primaryBtreeDrop);
 +
 +        return specPrimary;
 +    }
 +
 +    public static JobSpecification createDatasetJobSpec(Dataverse dataverse, String datasetName,
 +            AqlMetadataProvider metadata) throws AsterixException, AlgebricksException {
 +        String dataverseName = dataverse.getDataverseName();
 +        IDataFormat format;
 +        try {
 +            format = (IDataFormat) Class.forName(dataverse.getDataFormat()).newInstance();
 +        } catch (Exception e) {
 +            throw new AsterixException(e);
 +        }
 +        Dataset dataset = metadata.findDataset(dataverseName, datasetName);
 +        if (dataset == null) {
 +            throw new AsterixException("Could not find dataset " + datasetName + " in dataverse " + dataverseName);
 +        }
 +        boolean temp = dataset.getDatasetDetails().isTemp();
 +        ARecordType itemType = (ARecordType) metadata.findType(dataset.getItemTypeDataverseName(),
 +                dataset.getItemTypeName());
 +        // get meta item type
 +        ARecordType metaItemType = null;
 +        if (dataset.hasMetaPart()) {
 +            metaItemType = (ARecordType) metadata.findType(dataset.getMetaItemTypeDataverseName(),
 +                    dataset.getMetaItemTypeName());
 +        }
 +        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
 +        IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset,
 +                itemType, format.getBinaryComparatorFactoryProvider());
 +        ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType, metaItemType);
 +        int[] bloomFilterKeyFields = DatasetUtils.createBloomFilterKeyFields(dataset);
 +
 +        ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, itemType);
 +        IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
 +                itemType, format.getBinaryComparatorFactoryProvider());
 +        int[] filterFields = DatasetUtils.createFilterFields(dataset);
 +        int[] btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset);
 +
 +        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadata
 +                .splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, datasetName, temp);
 +        FileSplit[] fs = splitsAndConstraint.first.getFileSplits();
 +        StringBuilder sb = new StringBuilder();
 +        for (int i = 0; i < fs.length; i++) {
 +            sb.append(stringOf(fs[i]) + " ");
 +        }
 +        LOGGER.info("CREATING File Splits: " + sb.toString());
 +
 +        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(dataset,
 +                metadata.getMetadataTxnContext());
 +        AsterixStorageProperties storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
 +        //prepare a LocalResourceMetadata which will be stored in NC's local resource repository
 +        ILocalResourceMetadata localResourceMetadata = new LSMBTreeLocalResourceMetadata(typeTraits,
 +                comparatorFactories, bloomFilterKeyFields, true, dataset.getDatasetId(), compactionInfo.first,
 +                compactionInfo.second, filterTypeTraits, filterCmpFactories, btreeFields, filterFields);
 +        ILocalResourceFactoryProvider localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider(
 +                localResourceMetadata, LocalResource.LSMBTreeResource);
 +
 +        TreeIndexCreateOperatorDescriptor indexCreateOp = new TreeIndexCreateOperatorDescriptor(spec,
 +                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
 +                splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields,
 +                new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
 +                        compactionInfo.first, compactionInfo.second,
 +                        new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
 +                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
 +                        storageProperties.getBloomFilterFalsePositiveRate(), true, filterTypeTraits, filterCmpFactories,
 +                        btreeFields, filterFields, !temp),
 +                localResourceFactoryProvider, NoOpOperationCallbackFactory.INSTANCE);
 +        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, indexCreateOp,
 +                splitsAndConstraint.second);
 +        spec.addRoot(indexCreateOp);
 +        return spec;
 +    }
 +
 +    private static String stringOf(FileSplit fs) {
 +        return fs.getNodeName() + ":" + fs.getLocalFile().toString();
 +    }
 +
 +    public static JobSpecification compactDatasetJobSpec(Dataverse dataverse, String datasetName,
 +            AqlMetadataProvider metadata) throws AsterixException, AlgebricksException {
 +        String dataverseName = dataverse.getDataverseName();
 +        IDataFormat format;
 +        try {
 +            format = (IDataFormat) Class.forName(dataverse.getDataFormat()).newInstance();
 +        } catch (Exception e) {
 +            throw new AsterixException(e);
 +        }
 +        Dataset dataset = metadata.findDataset(dataverseName, datasetName);
 +        if (dataset == null) {
 +            throw new AsterixException("Could not find dataset " + datasetName + " in dataverse " + dataverseName);
 +        }
 +        boolean temp = dataset.getDatasetDetails().isTemp();
- 
 +        ARecordType itemType = (ARecordType) metadata.findType(dataset.getItemTypeDataverseName(),
 +                dataset.getItemTypeName());
++        ARecordType metaItemType = DatasetUtils.getMetaType(metadata, dataset);
 +        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
 +        IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset,
 +                itemType, format.getBinaryComparatorFactoryProvider());
-         ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType);
++        ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType, metaItemType);
 +        int[] blooFilterKeyFields = DatasetUtils.createBloomFilterKeyFields(dataset);
- 
 +        ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, itemType);
 +        IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
 +                itemType, format.getBinaryComparatorFactoryProvider());
 +        int[] filterFields = DatasetUtils.createFilterFields(dataset);
 +        int[] btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset);
- 
 +        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadata
 +                .splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, datasetName, temp);
- 
 +        AsterixStorageProperties storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
- 
 +        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(dataset,
 +                metadata.getMetadataTxnContext());
 +        LSMTreeIndexCompactOperatorDescriptor compactOp = new LSMTreeIndexCompactOperatorDescriptor(spec,
 +                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
 +                splitsAndConstraint.first, typeTraits, comparatorFactories, blooFilterKeyFields,
 +                new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
 +                        compactionInfo.first, compactionInfo.second,
 +                        new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
 +                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
 +                        storageProperties.getBloomFilterFalsePositiveRate(), true, filterTypeTraits, filterCmpFactories,
 +                        btreeFields, filterFields, !temp),
 +                NoOpOperationCallbackFactory.INSTANCE);
 +        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, compactOp,
 +                splitsAndConstraint.second);
 +
 +        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, compactOp,
 +                splitsAndConstraint.second);
 +        spec.addRoot(compactOp);
 +        return spec;
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/DataverseOperations.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-app/src/main/java/org/apache/asterix/file/DataverseOperations.java
index c77ca10,0000000..d5765f1
mode 100644,000000..100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/DataverseOperations.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/DataverseOperations.java
@@@ -1,40 -1,0 +1,40 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.asterix.file;
 +
 +import org.apache.asterix.metadata.declared.AqlMetadataProvider;
 +import org.apache.asterix.metadata.entities.Dataverse;
 +import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 +import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
 +import org.apache.hyracks.algebricks.common.utils.Pair;
 +import org.apache.hyracks.api.job.JobSpecification;
 +import org.apache.hyracks.dataflow.std.file.FileRemoveOperatorDescriptor;
 +import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
 +
 +public class DataverseOperations {
 +    public static JobSpecification createDropDataverseJobSpec(Dataverse dataverse, AqlMetadataProvider metadata) {
 +        JobSpecification jobSpec = JobSpecificationUtils.createJobSpecification();
 +        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadata
 +                .splitProviderAndPartitionConstraintsForDataverse(dataverse.getDataverseName());
-         FileRemoveOperatorDescriptor frod = new FileRemoveOperatorDescriptor(jobSpec, splitsAndConstraint.first);
++        FileRemoveOperatorDescriptor frod = new FileRemoveOperatorDescriptor(jobSpec, splitsAndConstraint.first, false);
 +        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(jobSpec, frod, splitsAndConstraint.second);
 +        jobSpec.addRoot(frod);
 +        return jobSpec;
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
index 0a0a917,0000000..13b0189
mode 100644,000000..100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
@@@ -1,212 -1,0 +1,215 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.asterix.messaging;
 +
 +import java.util.Map;
 +import java.util.concurrent.ConcurrentHashMap;
 +import java.util.concurrent.atomic.AtomicLong;
 +import java.util.logging.Level;
 +import java.util.logging.Logger;
 +
 +import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 +import org.apache.asterix.common.messaging.AbstractApplicationMessage;
 +import org.apache.asterix.common.messaging.CompleteFailbackRequestMessage;
 +import org.apache.asterix.common.messaging.CompleteFailbackResponseMessage;
 +import org.apache.asterix.common.messaging.PreparePartitionsFailbackRequestMessage;
 +import org.apache.asterix.common.messaging.PreparePartitionsFailbackResponseMessage;
 +import org.apache.asterix.common.messaging.ReplicaEventMessage;
 +import org.apache.asterix.common.messaging.ReportMaxResourceIdMessage;
 +import org.apache.asterix.common.messaging.TakeoverMetadataNodeResponseMessage;
 +import org.apache.asterix.common.messaging.TakeoverPartitionsRequestMessage;
 +import org.apache.asterix.common.messaging.TakeoverPartitionsResponseMessage;
 +import org.apache.asterix.common.messaging.api.IApplicationMessage;
 +import org.apache.asterix.common.messaging.api.IApplicationMessageCallback;
 +import org.apache.asterix.common.messaging.api.INCMessageBroker;
 +import org.apache.asterix.common.replication.IRemoteRecoveryManager;
 +import org.apache.asterix.common.replication.Replica;
 +import org.apache.asterix.common.replication.ReplicaEvent;
 +import org.apache.asterix.event.schema.cluster.Node;
 +import org.apache.asterix.metadata.bootstrap.MetadataIndexImmutableProperties;
 +import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 +import org.apache.hyracks.api.messages.IMessage;
 +import org.apache.hyracks.api.util.JavaSerializationUtils;
 +import org.apache.hyracks.control.nc.NodeControllerService;
 +
 +public class NCMessageBroker implements INCMessageBroker {
 +    private final static Logger LOGGER = Logger.getLogger(NCMessageBroker.class.getName());
 +
 +    private final NodeControllerService ncs;
 +    private final AtomicLong messageId = new AtomicLong(0);
 +    private final Map<Long, IApplicationMessageCallback> callbacks;
 +    private final IAsterixAppRuntimeContext appContext;
 +
 +    public NCMessageBroker(NodeControllerService ncs) {
 +        this.ncs = ncs;
 +        appContext = (IAsterixAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
 +        callbacks = new ConcurrentHashMap<Long, IApplicationMessageCallback>();
 +    }
 +
 +    @Override
 +    public void sendMessage(IApplicationMessage message, IApplicationMessageCallback callback) throws Exception {
 +        if (callback != null) {
 +            long uniqueMessageId = messageId.incrementAndGet();
 +            message.setId(uniqueMessageId);
 +            callbacks.put(uniqueMessageId, callback);
 +        }
 +        try {
 +            ncs.sendApplicationMessageToCC(JavaSerializationUtils.serialize(message), null);
 +        } catch (Exception e) {
 +            if (callback != null) {
 +                //remove the callback in case of failure
 +                callbacks.remove(message.getId());
 +            }
 +            throw e;
 +        }
 +    }
 +
 +    @Override
 +    public void receivedMessage(IMessage message, String nodeId) throws Exception {
 +        try {
 +            AbstractApplicationMessage absMessage = (AbstractApplicationMessage) message;
 +            if (LOGGER.isLoggable(Level.INFO)) {
 +                LOGGER.info("Received message: " + absMessage.getMessageType().name());
 +            }
 +            //if the received message is a response to a sent message, deliver it to the sender
 +            IApplicationMessageCallback callback = callbacks.remove(absMessage.getId());
 +            if (callback != null) {
 +                callback.deliverMessageResponse(absMessage);
 +            }
 +
 +            //handle requests from CC
 +            switch (absMessage.getMessageType()) {
 +                case REPORT_MAX_RESOURCE_ID_REQUEST:
 +                    reportMaxResourceId();
 +                    break;
 +                case TAKEOVER_PARTITIONS_REQUEST:
 +                    handleTakeoverPartitons(message);
 +                    break;
 +                case TAKEOVER_METADATA_NODE_REQUEST:
 +                    handleTakeoverMetadataNode(message);
 +                    break;
 +                case PREPARE_PARTITIONS_FAILBACK_REQUEST:
 +                    handlePreparePartitionsFailback(message);
 +                    break;
 +                case COMPLETE_FAILBACK_REQUEST:
 +                    handleCompleteFailbackRequest(message);
 +                    break;
 +                case REPLICA_EVENT:
 +                    handleReplicaEvent(message);
 +                    break;
 +                default:
 +                    break;
 +            }
 +        } catch (Exception e) {
 +            e.printStackTrace();
 +            throw e;
 +        }
 +    }
 +
 +    private void handleTakeoverPartitons(IMessage message) throws Exception {
 +        TakeoverPartitionsRequestMessage msg = (TakeoverPartitionsRequestMessage) message;
-         try {
-             IRemoteRecoveryManager remoteRecoeryManager = appContext.getRemoteRecoveryManager();
-             remoteRecoeryManager.takeoverPartitons(msg.getPartitions());
-         } finally {
-             //send response after takeover is completed
-             TakeoverPartitionsResponseMessage reponse = new TakeoverPartitionsResponseMessage(msg.getRequestId(),
-                     appContext.getTransactionSubsystem().getId(), msg.getPartitions());
-             sendMessage(reponse, null);
++        //if the NC is shutting down, it should ignore takeover partitions request
++        if (!appContext.isShuttingdown()) {
++            try {
++                IRemoteRecoveryManager remoteRecoeryManager = appContext.getRemoteRecoveryManager();
++                remoteRecoeryManager.takeoverPartitons(msg.getPartitions());
++            } finally {
++                //send response after takeover is completed
++                TakeoverPartitionsResponseMessage reponse = new TakeoverPartitionsResponseMessage(msg.getRequestId(),
++                        appContext.getTransactionSubsystem().getId(), msg.getPartitions());
++                sendMessage(reponse, null);
++            }
 +        }
 +    }
 +
 +    private void handleTakeoverMetadataNode(IMessage message) throws Exception {
 +        try {
 +            appContext.initializeMetadata(false);
 +            appContext.exportMetadataNodeStub();
 +        } finally {
 +            TakeoverMetadataNodeResponseMessage reponse = new TakeoverMetadataNodeResponseMessage(
 +                    appContext.getTransactionSubsystem().getId());
 +            sendMessage(reponse, null);
 +        }
 +    }
 +
 +    @Override
 +    public void reportMaxResourceId() throws Exception {
 +        ReportMaxResourceIdMessage maxResourceIdMsg = new ReportMaxResourceIdMessage();
 +        //resource ids < FIRST_AVAILABLE_USER_DATASET_ID are reserved for metadata indexes.
 +        long maxResourceId = Math.max(appContext.getLocalResourceRepository().getMaxResourceID(),
 +                MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID);
 +        maxResourceIdMsg.setMaxResourceId(maxResourceId);
 +        sendMessage(maxResourceIdMsg, null);
 +    }
 +
 +    private void handleReplicaEvent(IMessage message) {
 +        ReplicaEventMessage msg = (ReplicaEventMessage) message;
 +        Node node = new Node();
 +        node.setId(msg.getNodeId());
 +        node.setClusterIp(msg.getNodeIPAddress());
 +        Replica replica = new Replica(node);
 +        ReplicaEvent event = new ReplicaEvent(replica, msg.getEvent());
 +        appContext.getReplicationManager().reportReplicaEvent(event);
 +    }
 +
 +    private void handlePreparePartitionsFailback(IMessage message) throws Exception {
 +        PreparePartitionsFailbackRequestMessage msg = (PreparePartitionsFailbackRequestMessage) message;
 +        /**
 +         * if the metadata partition will be failed back
 +         * we need to flush and close all datasets including metadata datasets
 +         * otherwise we need to close all non-metadata datasets and flush metadata datasets
 +         * so that their memory components will be copied to the failing back node
 +         */
 +        if (msg.isReleaseMetadataNode()) {
 +            appContext.getDatasetLifecycleManager().closeAllDatasets();
 +            //remove the metadata node stub from RMI registry
 +            appContext.unexportMetadataNodeStub();
 +        } else {
 +            //close all non-metadata datasets
 +            appContext.getDatasetLifecycleManager().closeUserDatasets();
 +            //flush the remaining metadata datasets that were not closed
 +            appContext.getDatasetLifecycleManager().flushAllDatasets();
 +        }
 +
 +        //mark the partitions to be closed as inactive
 +        PersistentLocalResourceRepository localResourceRepo = (PersistentLocalResourceRepository) appContext
 +                .getLocalResourceRepository();
 +        for (Integer partitionId : msg.getPartitions()) {
 +            localResourceRepo.addInactivePartition(partitionId);
 +        }
 +
 +        //send response after partitions prepared for failback
 +        PreparePartitionsFailbackResponseMessage reponse = new PreparePartitionsFailbackResponseMessage(msg.getPlanId(),
 +                msg.getRequestId(), msg.getPartitions());
 +        sendMessage(reponse, null);
 +    }
 +
 +    private void handleCompleteFailbackRequest(IMessage message) throws Exception {
 +        CompleteFailbackRequestMessage msg = (CompleteFailbackRequestMessage) message;
 +        try {
 +            IRemoteRecoveryManager remoteRecoeryManager = appContext.getRemoteRecoveryManager();
 +            remoteRecoeryManager.completeFailbackProcess();
 +        } finally {
 +            CompleteFailbackResponseMessage reponse = new CompleteFailbackResponseMessage(msg.getPlanId(),
 +                    msg.getRequestId(), msg.getPartitions());
 +            sendMessage(reponse, null);
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTest.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTest.java
index 8d020e7,0000000..e372d31
mode 100644,000000..100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTest.java
@@@ -1,106 -1,0 +1,107 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.asterix.test.runtime;
 +
 +import java.io.File;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.logging.Logger;
 +
 +import org.apache.asterix.app.external.TestLibrarian;
 +import org.apache.asterix.common.config.AsterixTransactionProperties;
 +import org.apache.asterix.test.aql.TestExecutor;
 +import org.apache.asterix.testframework.context.TestCaseContext;
 +import org.apache.commons.lang3.StringUtils;
 +import org.junit.AfterClass;
 +import org.junit.BeforeClass;
 +import org.junit.Test;
 +import org.junit.runner.RunWith;
 +import org.junit.runners.Parameterized;
 +import org.junit.runners.Parameterized.Parameters;
 +
 +/**
 + * Runs the runtime test cases under 'asterix-app/src/test/resources/runtimets'.
 + */
 +@RunWith(Parameterized.class)
 +public class ExecutionTest {
 +
 +    protected static final Logger LOGGER = Logger.getLogger(ExecutionTest.class.getName());
 +
 +    protected static final String PATH_ACTUAL = "rttest" + File.separator;
 +    protected static final String PATH_BASE = StringUtils.join(new String[] { "src", "test", "resources", "runtimets" },
 +            File.separator);
 +
 +    protected static final String TEST_CONFIG_FILE_NAME = "asterix-build-configuration.xml";
- 
 +    protected static AsterixTransactionProperties txnProperties;
-     private final static TestExecutor testExecutor = new TestExecutor();
++    private static final TestExecutor testExecutor = new TestExecutor();
++    private static final boolean cleanupOnStart = true;
++    private static final boolean cleanupOnStop = true;
 +
 +    @BeforeClass
 +    public static void setUp() throws Exception {
 +        try {
 +            File outdir = new File(PATH_ACTUAL);
 +            outdir.mkdirs();
 +            // remove library directory
 +            TestLibrarian.removeLibraryDir();
 +            testExecutor.setLibrarian(new TestLibrarian());
-             ExecutionTestUtil.setUp();
++            ExecutionTestUtil.setUp(cleanupOnStart);
 +        } catch (Throwable th) {
 +            th.printStackTrace();
 +            throw th;
 +        }
 +    }
 +
 +    @AfterClass
 +    public static void tearDown() throws Exception {
 +        // remove library directory
 +        TestLibrarian.removeLibraryDir();
-         ExecutionTestUtil.tearDown();
++        ExecutionTestUtil.tearDown(cleanupOnStop);
 +    }
 +
 +    @Parameters(name = "ExecutionTest {index}: {0}")
 +    public static Collection<Object[]> tests() throws Exception {
 +        Collection<Object[]> testArgs = buildTestsInXml(TestCaseContext.ONLY_TESTSUITE_XML_NAME);
 +        if (testArgs.size() == 0) {
 +            testArgs = buildTestsInXml(TestCaseContext.DEFAULT_TESTSUITE_XML_NAME);
 +        }
 +        return testArgs;
 +    }
 +
 +    protected static Collection<Object[]> buildTestsInXml(String xmlfile) throws Exception {
 +        Collection<Object[]> testArgs = new ArrayList<Object[]>();
 +        TestCaseContext.Builder b = new TestCaseContext.Builder();
 +        for (TestCaseContext ctx : b.build(new File(PATH_BASE), xmlfile)) {
 +            testArgs.add(new Object[] { ctx });
 +        }
 +        return testArgs;
 +
 +    }
 +
 +    protected TestCaseContext tcCtx;
 +
 +    public ExecutionTest(TestCaseContext tcCtx) {
 +        this.tcCtx = tcCtx;
 +    }
 +
 +    @Test
 +    public void test() throws Exception {
 +        testExecutor.executeTest(PATH_ACTUAL, tcCtx, null, false, ExecutionTestUtil.FailedGroup);
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
index 5e76ecb,0000000..d919c92
mode 100644,000000..100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
@@@ -1,112 -1,0 +1,112 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.asterix.test.runtime;
 +
 +import java.io.File;
 +import java.nio.file.Paths;
 +import java.util.logging.Level;
 +import java.util.logging.Logger;
 +
 +import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
 +import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 +import org.apache.asterix.common.config.GlobalConfig;
 +import org.apache.asterix.external.util.ExternalDataConstants;
 +import org.apache.asterix.external.util.IdentitiyResolverFactory;
 +import org.apache.asterix.testframework.xml.TestGroup;
 +import org.apache.asterix.testframework.xml.TestSuite;
 +import org.apache.hyracks.control.nc.NodeControllerService;
 +import org.apache.hyracks.storage.common.buffercache.BufferCache;
 +
 +public class ExecutionTestUtil {
 +
 +    protected static final Logger LOGGER = Logger.getLogger(ExecutionTest.class.getName());
 +
 +    protected static final String PATH_ACTUAL = "rttest" + File.separator;
 +
 +    protected static final String TEST_CONFIG_FILE_NAME = "asterix-build-configuration.xml";
 +
 +    protected static TestGroup FailedGroup;
 +
-     public static void setUp() throws Exception {
++    public static void setUp(boolean cleanup) throws Exception {
 +        System.out.println("Starting setup");
 +        if (LOGGER.isLoggable(Level.INFO)) {
 +            LOGGER.info("Starting setup");
 +        }
 +        System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, TEST_CONFIG_FILE_NAME);
 +
 +        if (LOGGER.isLoggable(Level.INFO)) {
 +            LOGGER.info("initializing pseudo cluster");
 +        }
-         AsterixHyracksIntegrationUtil.init(true);
++        AsterixHyracksIntegrationUtil.init(cleanup);
 +
 +        if (LOGGER.isLoggable(Level.INFO)) {
 +            LOGGER.info("initializing HDFS");
 +        }
 +
 +        HDFSCluster.getInstance().setup();
 +
 +        // Set the node resolver to be the identity resolver that expects node
 +        // names
 +        // to be node controller ids; a valid assumption in test environment.
 +        System.setProperty(ExternalDataConstants.NODE_RESOLVER_FACTORY_PROPERTY,
 +                IdentitiyResolverFactory.class.getName());
 +
 +        FailedGroup = new TestGroup();
 +        FailedGroup.setName("failed");
 +    }
 +
 +    private static void validateBufferCacheState() {
 +        for (NodeControllerService nc : AsterixHyracksIntegrationUtil.ncs) {
 +            IAsterixAppRuntimeContext appCtx = (IAsterixAppRuntimeContext) nc.getApplicationContext()
 +                    .getApplicationObject();
 +            if (!((BufferCache) appCtx.getBufferCache()).isClean()) {
 +                throw new IllegalStateException();
 +            }
 +        }
 +    }
 +
-     public static void tearDown() throws Exception {
++    public static void tearDown(boolean cleanup) throws Exception {
 +        // validateBufferCacheState(); <-- Commented out until bug is fixed -->
-         AsterixHyracksIntegrationUtil.deinit(true);
++        AsterixHyracksIntegrationUtil.deinit(cleanup);
 +        File outdir = new File(PATH_ACTUAL);
 +        File[] files = outdir.listFiles();
 +        if (files == null || files.length == 0) {
 +            outdir.delete();
 +        }
 +        HDFSCluster.getInstance().cleanup();
 +
 +        if (FailedGroup != null && FailedGroup.getTestCase().size() > 0) {
 +            File temp = File.createTempFile("failed", ".xml");
 +            javax.xml.bind.JAXBContext jaxbCtx = null;
 +            jaxbCtx = javax.xml.bind.JAXBContext.newInstance(TestSuite.class.getPackage().getName());
 +            javax.xml.bind.Marshaller marshaller = null;
 +            marshaller = jaxbCtx.createMarshaller();
 +            marshaller.setProperty(javax.xml.bind.Marshaller.JAXB_FORMATTED_OUTPUT, Boolean.TRUE);
 +            TestSuite failedSuite = new TestSuite();
 +            failedSuite.setResultOffsetPath("results");
 +            failedSuite.setQueryOffsetPath("queries");
 +            failedSuite.getTestGroup().add(FailedGroup);
 +            marshaller.marshal(failedSuite, temp);
 +            System.err.println("The failed.xml is written to :" + temp.getAbsolutePath()
 +                    + ". You can copy it to only.xml by the following cmd:" + "\rcp " + temp.getAbsolutePath() + " "
 +                    + Paths.get("./src/test/resources/runtimets/only.xml").toAbsolutePath());
 +        }
 +    }
 +
 +}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionTest.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionTest.java
index cbb14c5,0000000..b827a0d
mode 100644,000000..100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionTest.java
@@@ -1,100 -1,0 +1,102 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.asterix.test.runtime;
 +
 +import java.io.File;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.logging.Logger;
 +
 +import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
 +import org.apache.asterix.common.config.AsterixTransactionProperties;
 +import org.apache.asterix.test.aql.TestExecutor;
 +import org.apache.asterix.testframework.context.TestCaseContext;
 +import org.apache.asterix.testframework.xml.TestGroup;
 +import org.apache.commons.lang3.StringUtils;
 +import org.junit.AfterClass;
 +import org.junit.BeforeClass;
 +import org.junit.Test;
 +import org.junit.runner.RunWith;
 +import org.junit.runners.Parameterized;
 +import org.junit.runners.Parameterized.Parameters;
 +
 +/**
 + * Runs the runtime test cases under 'asterix-app/src/test/resources/runtimets'.
 + */
 +@RunWith(Parameterized.class)
 +public class SqlppExecutionTest {
 +
 +    protected static final Logger LOGGER = Logger.getLogger(SqlppExecutionTest.class.getName());
 +
 +    protected static final String PATH_ACTUAL = "rttest" + File.separator;
 +    protected static final String PATH_BASE = StringUtils.join(new String[] { "src", "test", "resources", "runtimets" },
 +            File.separator);
 +
 +    protected static final String TEST_CONFIG_FILE_NAME = "asterix-build-configuration.xml";
 +
 +    protected static AsterixTransactionProperties txnProperties;
-     private final static TestExecutor testExecutor = new TestExecutor();
++    private static final TestExecutor testExecutor = new TestExecutor();
++    private static final boolean cleanupOnStart = true;
++    private static final boolean cleanupOnStop = true;
 +
 +    protected static TestGroup FailedGroup;
 +
 +    @BeforeClass
 +    public static void setUp() throws Exception {
 +        File outdir = new File(PATH_ACTUAL);
 +        outdir.mkdirs();
-         ExecutionTestUtil.setUp();
++        ExecutionTestUtil.setUp(cleanupOnStart);
 +    }
 +
 +    @AfterClass
 +    public static void tearDown() throws Exception {
-         ExecutionTestUtil.tearDown();
++        ExecutionTestUtil.tearDown(cleanupOnStop);
 +        AsterixHyracksIntegrationUtil.removeTestStorageFiles();
 +    }
 +
 +    @Parameters(name = "SqlppExecutionTest {index}: {0}")
 +    public static Collection<Object[]> tests() throws Exception {
 +        Collection<Object[]> testArgs = buildTestsInXml("only_sqlpp.xml");
 +        if (testArgs.size() == 0) {
 +            testArgs = buildTestsInXml("testsuite_sqlpp.xml");
 +        }
 +        return testArgs;
 +    }
 +
 +    protected static Collection<Object[]> buildTestsInXml(String xmlfile) throws Exception {
 +        Collection<Object[]> testArgs = new ArrayList<Object[]>();
 +        TestCaseContext.Builder b = new TestCaseContext.Builder();
 +        for (TestCaseContext ctx : b.build(new File(PATH_BASE), xmlfile)) {
 +            testArgs.add(new Object[] { ctx });
 +        }
 +        return testArgs;
 +
 +    }
 +
 +    protected TestCaseContext tcCtx;
 +
 +    public SqlppExecutionTest(TestCaseContext tcCtx) {
 +        this.tcCtx = tcCtx;
 +    }
 +
 +    @Test
 +    public void test() throws Exception {
 +        testExecutor.executeTest(PATH_ACTUAL, tcCtx, null, false, FailedGroup);
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/sqlpp/ParserTestExecutor.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-app/src/test/java/org/apache/asterix/test/sqlpp/ParserTestExecutor.java
index d6cf231,0000000..7f7fbb4
mode 100644,000000..100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/sqlpp/ParserTestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/sqlpp/ParserTestExecutor.java
@@@ -1,181 -1,0 +1,182 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.asterix.test.sqlpp;
 +
 +import static org.mockito.Mockito.mock;
 +import static org.mockito.Mockito.when;
 +
 +import java.io.File;
 +import java.io.FileOutputStream;
 +import java.io.PrintWriter;
 +import java.util.ArrayList;
 +import java.util.List;
 +import java.util.Map;
 +
 +import org.apache.asterix.common.config.GlobalConfig;
 +import org.apache.asterix.common.exceptions.AsterixException;
 +import org.apache.asterix.lang.common.base.IParser;
 +import org.apache.asterix.lang.common.base.IParserFactory;
 +import org.apache.asterix.lang.common.base.IQueryRewriter;
 +import org.apache.asterix.lang.common.base.IRewriterFactory;
 +import org.apache.asterix.lang.common.base.Statement;
 +import org.apache.asterix.lang.common.base.Statement.Kind;
 +import org.apache.asterix.lang.common.rewrites.LangRewritingContext;
 +import org.apache.asterix.lang.common.statement.DataverseDecl;
 +import org.apache.asterix.lang.common.statement.FunctionDecl;
 +import org.apache.asterix.lang.common.statement.Query;
 +import org.apache.asterix.lang.common.util.FunctionUtil;
 +import org.apache.asterix.lang.sqlpp.parser.SqlppParserFactory;
 +import org.apache.asterix.lang.sqlpp.rewrites.SqlppRewriterFactory;
 +import org.apache.asterix.lang.sqlpp.util.SqlppAstPrintUtil;
 +import org.apache.asterix.metadata.declared.AqlMetadataProvider;
 +import org.apache.asterix.test.aql.TestExecutor;
 +import org.apache.asterix.testframework.context.TestCaseContext;
 +import org.apache.asterix.testframework.context.TestFileContext;
 +import org.apache.asterix.testframework.xml.TestCase.CompilationUnit;
 +import org.apache.asterix.testframework.xml.TestGroup;
 +
 +import junit.extensions.PA;
 +
 +public class ParserTestExecutor extends TestExecutor {
 +
 +    private IParserFactory sqlppParserFactory = new SqlppParserFactory();
 +    private IRewriterFactory sqlppRewriterFactory = new SqlppRewriterFactory();
 +
 +    @Override
 +    public void executeTest(String actualPath, TestCaseContext testCaseCtx, ProcessBuilder pb,
 +            boolean isDmlRecoveryTest, TestGroup failedGroup) throws Exception {
 +        int queryCount = 0;
 +        List<CompilationUnit> cUnits = testCaseCtx.getTestCase().getCompilationUnit();
 +        for (CompilationUnit cUnit : cUnits) {
 +            LOGGER.info(
 +                    "Starting [TEST]: " + testCaseCtx.getTestCase().getFilePath() + "/" + cUnit.getName() + " ... ");
 +            List<TestFileContext> testFileCtxs = testCaseCtx.getTestFiles(cUnit);
 +            List<TestFileContext> expectedResultFileCtxs = testCaseCtx.getExpectedResultFiles(cUnit);
 +            for (TestFileContext ctx : testFileCtxs) {
 +                File testFile = ctx.getFile();
 +                try {
 +                    if (queryCount >= expectedResultFileCtxs.size()) {
 +                        throw new IllegalStateException("no result file for " + testFile.toString() + "; queryCount: "
 +                                + queryCount + ", filectxs.size: " + expectedResultFileCtxs.size());
 +                    }
 +
 +                    // Runs the test query.
 +                    File actualResultFile = testCaseCtx.getActualResultFile(cUnit, new File(actualPath));
 +                    File expectedResultFile = expectedResultFileCtxs.get(queryCount).getFile();
 +                    testSQLPPParser(testFile, actualResultFile, expectedResultFile);
 +
 +                    LOGGER.info(
 +                            "[TEST]: " + testCaseCtx.getTestCase().getFilePath() + "/" + cUnit.getName() + " PASSED ");
 +                    queryCount++;
 +                } catch (Exception e) {
 +                    System.err.println("testFile " + testFile.toString() + " raised an exception:");
 +                    e.printStackTrace();
 +                    if (cUnit.getExpectedError().isEmpty()) {
 +                        System.err.println("...Unexpected!");
 +                        if (failedGroup != null) {
 +                            failedGroup.getTestCase().add(testCaseCtx.getTestCase());
 +                        }
 +                        throw new Exception("Test \"" + testFile + "\" FAILED!", e);
 +                    } else {
 +                        LOGGER.info("[TEST]: " + testCaseCtx.getTestCase().getFilePath() + "/" + cUnit.getName()
 +                                + " failed as expected: " + e.getMessage());
 +                        System.err.println("...but that was expected.");
 +                    }
 +                }
 +            }
 +        }
 +
 +    }
 +
 +    // Tests the SQL++ parser.
 +    public void testSQLPPParser(File queryFile, File actualResultFile, File expectedFile) throws Exception {
 +        actualResultFile.getParentFile().mkdirs();
 +        PrintWriter writer = new PrintWriter(new FileOutputStream(actualResultFile));
 +        IParser parser = sqlppParserFactory.createParser(readTestFile(queryFile));
 +        GlobalConfig.ASTERIX_LOGGER.info(queryFile.toString());
 +        try {
 +            List<Statement> statements = parser.parse();
 +            List<FunctionDecl> functions = getDeclaredFunctions(statements);
 +            String dvName = getDefaultDataverse(statements);
 +            AqlMetadataProvider aqlMetadataProvider = mock(AqlMetadataProvider.class);
 +
 +            @SuppressWarnings("unchecked")
 +            Map<String, String> config = mock(Map.class);
 +            when(aqlMetadataProvider.getDefaultDataverseName()).thenReturn(dvName);
 +            when(aqlMetadataProvider.getConfig()).thenReturn(config);
 +            when(config.get(FunctionUtil.IMPORT_PRIVATE_FUNCTIONS)).thenReturn("true");
 +
 +            for (Statement st : statements) {
 +                if (st.getKind() == Kind.QUERY) {
 +                    Query query = (Query) st;
 +                    IQueryRewriter rewriter = sqlppRewriterFactory.createQueryRewriter();
 +                    rewrite(rewriter, functions, query, aqlMetadataProvider,
 +                            new LangRewritingContext(query.getVarCounter()));
 +                }
 +                SqlppAstPrintUtil.print(st, writer);
 +            }
 +            writer.close();
 +            // Compares the actual result and the expected result.
 +            runScriptAndCompareWithResult(queryFile, new PrintWriter(System.err), expectedFile, actualResultFile);
 +        } catch (Exception e) {
 +            GlobalConfig.ASTERIX_LOGGER.warning("Failed while testing file " + queryFile);
 +            throw e;
 +        } finally {
 +            writer.close();
 +        }
 +    }
 +
 +    // Extracts declared functions.
 +    private List<FunctionDecl> getDeclaredFunctions(List<Statement> statements) {
 +        List<FunctionDecl> functionDecls = new ArrayList<FunctionDecl>();
 +        for (Statement st : statements) {
 +            if (st.getKind().equals(Statement.Kind.FUNCTION_DECL)) {
 +                functionDecls.add((FunctionDecl) st);
 +            }
 +        }
 +        return functionDecls;
 +    }
 +
 +    // Gets the default dataverse for the input statements.
 +    private String getDefaultDataverse(List<Statement> statements) {
 +        for (Statement st : statements) {
 +            if (st.getKind().equals(Statement.Kind.DATAVERSE_DECL)) {
 +                DataverseDecl dv = (DataverseDecl) st;
 +                return dv.getDataverseName().getValue();
 +            }
 +        }
 +        return null;
 +    }
 +
 +    // Rewrite queries.
 +    // Note: we do not do inline function rewriting here because this needs real
 +    // metadata access.
 +    private void rewrite(IQueryRewriter rewriter, List<FunctionDecl> declaredFunctions, Query topExpr,
 +            AqlMetadataProvider metadataProvider, LangRewritingContext context) throws AsterixException {
 +        PA.invokeMethod(rewriter,
 +                "setup(java.util.List, org.apache.asterix.lang.common.statement.Query, org.apache.asterix.metadata.declared.AqlMetadataProvider, "
 +                        + "org.apache.asterix.lang.common.rewrites.LangRewritingContext)",
 +                declaredFunctions, topExpr, metadataProvider, context);
 +        PA.invokeMethod(rewriter, "inlineColumnAlias()");
++        PA.invokeMethod(rewriter, "rewriteGlobalAggregations()");
 +        PA.invokeMethod(rewriter, "rewriteGroupBys()");
 +        PA.invokeMethod(rewriter, "variableCheckAndRewrite(boolean)", Boolean.TRUE);
 +    }
 +
 +}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-app/src/test/resources/optimizerts/queries_sqlpp/count-tweets.sqlpp
----------------------------------------------------------------------
diff --cc asterixdb/asterix-app/src/test/resources/optimizerts/queries_sqlpp/count-tweets.sqlpp
index a2ddf4b,0000000..0bea252
mode 100644,000000..100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/queries_sqlpp/count-tweets.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries_sqlpp/count-tweets.sqlpp
@@@ -1,43 -1,0 +1,43 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +
 +drop  database twitter if exists;
 +create  database twitter;
 +
 +use twitter;
 +
 +
 +create type twitter.Tweet as
 +{
 +  id : int32,
 +  tweetid : int64,
 +  loc : point,
 +  time : datetime,
 +  text : string
 +}
 +
 +create external  table TwitterData(Tweet) using localfs(("path"="asterix_nc1://data/twitter/smalltweets.txt"),("format"="adm"));
 +
 +write output to asterix_nc1:"/tmp/count-tweets.adm"
- select element {'word':tok,'count':twitter.count(token)}
++select element {'word':tok,'count':count(token)}
 +from  TwitterData as t,
 +      tokens as token
 +with  tokens as twitter."word-tokens"(t.text)
 +group by token as tok
 +;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-app/src/test/resources/optimizerts/queries_sqlpp/denorm-cust-order.sqlpp
----------------------------------------------------------------------
diff --cc asterixdb/asterix-app/src/test/resources/optimizerts/queries_sqlpp/denorm-cust-order.sqlpp
index b5fc4ea,0000000..d8e2b7a
mode 100644,000000..100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/queries_sqlpp/denorm-cust-order.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries_sqlpp/denorm-cust-order.sqlpp
@@@ -1,70 -1,0 +1,70 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +
 +drop  database custorder if exists;
 +create  database custorder;
 +
 +use custorder;
 +
 +
 +create type custorder.AddressType as
 + closed {
 +  number : int32,
 +  street : string,
 +  city : string
 +}
 +
 +create type custorder.CustomerType as
 + closed {
 +  cid : int32,
 +  name : string,
 +  age : int32?,
 +  address : AddressType?,
 +  lastorder : {
 +      oid : int32,
 +      total : float
 +  }
 +
 +}
 +
 +create type custorder.OrderType as
 + closed {
 +  oid : int32,
 +  cid : int32,
 +  orderstatus : string,
 +  orderpriority : string,
 +  clerk : string,
 +  total : float
 +}
 +
- create  nodegroup group1 if not exists  on 
++create  nodegroup group1 if not exists  on
 +    asterix_nc1,
 +    asterix_nc2
 +;
 +create  table Customers(CustomerType) primary key cid on group1;
 +
 +create  table Orders(OrderType) primary key oid on group1;
 +
 +write output to asterix_nc1:"/tmp/custorder.adm"
- select element {'cid':cid,'cust':cust,'cnt-orders':custorder.count(o),'orders':o}
++select element {'cid':cid,'cust':cust,'cnt-orders':count(o),'orders':o}
 +from  Customers as c,
 +      Orders as o
 +where (c.cid = o.cid)
 +group by c.cid as cid
 +;