You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by im...@apache.org on 2016/04/07 17:00:09 UTC
[34/50] [abbrv] incubator-asterixdb git commit: Merge branch 'master'
into hyracks-merge2
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index c67eb70,0000000..cc50b75
mode 100644,000000..100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@@ -1,211 -1,0 +1,232 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.api.common;
+
+import java.io.File;
++import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.List;
+
+import org.apache.asterix.common.config.AsterixPropertiesAccessor;
+import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.hyracks.bootstrap.CCApplicationEntryPoint;
+import org.apache.asterix.hyracks.bootstrap.NCApplicationEntryPoint;
+import org.apache.commons.io.FileUtils;
+import org.apache.hyracks.api.client.HyracksConnection;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.job.JobFlag;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.common.controllers.CCConfig;
+import org.apache.hyracks.control.common.controllers.NCConfig;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class AsterixHyracksIntegrationUtil {
+
+ private static final String IO_DIR_KEY = "java.io.tmpdir";
+ public static final int DEFAULT_HYRACKS_CC_CLIENT_PORT = 1098;
+ public static final int DEFAULT_HYRACKS_CC_CLUSTER_PORT = 1099;
+
+ public static ClusterControllerService cc;
+ public static NodeControllerService[] ncs;
+ public static IHyracksClientConnection hcc;
+
+ private static AsterixPropertiesAccessor propertiesAccessor;
+
+ public static void init(boolean deleteOldInstanceData) throws Exception {
+ propertiesAccessor = new AsterixPropertiesAccessor();
+ ncs = new NodeControllerService[propertiesAccessor.getNodeNames().size()];
+ if (deleteOldInstanceData) {
+ deleteTransactionLogs();
+ removeTestStorageFiles();
+ }
+
+ CCConfig ccConfig = new CCConfig();
+ ccConfig.clusterNetIpAddress = "127.0.0.1";
+ ccConfig.clientNetIpAddress = "127.0.0.1";
+ ccConfig.clientNetPort = DEFAULT_HYRACKS_CC_CLIENT_PORT;
+ ccConfig.clusterNetPort = DEFAULT_HYRACKS_CC_CLUSTER_PORT;
+ ccConfig.defaultMaxJobAttempts = 0;
+ ccConfig.resultTTL = 30000;
+ ccConfig.resultSweepThreshold = 1000;
+ ccConfig.appCCMainClass = CCApplicationEntryPoint.class.getName();
+ // ccConfig.useJOL = true;
+ cc = new ClusterControllerService(ccConfig);
+ cc.start();
+
+ // Starts ncs.
+ int n = 0;
+ List<String> nodes = propertiesAccessor.getNodeNames();
+ for (String ncName : nodes) {
+ NCConfig ncConfig1 = new NCConfig();
+ ncConfig1.ccHost = "localhost";
+ ncConfig1.ccPort = DEFAULT_HYRACKS_CC_CLUSTER_PORT;
+ ncConfig1.clusterNetIPAddress = "127.0.0.1";
+ ncConfig1.dataIPAddress = "127.0.0.1";
+ ncConfig1.resultIPAddress = "127.0.0.1";
+ ncConfig1.nodeId = ncName;
+ ncConfig1.resultTTL = 30000;
+ ncConfig1.resultSweepThreshold = 1000;
+ ncConfig1.appArgs = Arrays.asList("-virtual-NC");
+ String tempPath = System.getProperty(IO_DIR_KEY);
+ if (tempPath.endsWith(File.separator)) {
+ tempPath = tempPath.substring(0, tempPath.length() - 1);
+ }
+ System.err.println("Using the path: " + tempPath);
+ // get initial partitions from properties
+ String[] nodeStores = propertiesAccessor.getStores().get(ncName);
+ if (nodeStores == null) {
+ throw new Exception("Coudn't find stores for NC: " + ncName);
+ }
+ String tempDirPath = System.getProperty(IO_DIR_KEY);
+ if (!tempDirPath.endsWith(File.separator)) {
+ tempDirPath += File.separator;
+ }
+ for (int p = 0; p < nodeStores.length; p++) {
+ // create IO devices based on stores
+ String iodevicePath = tempDirPath + ncConfig1.nodeId + File.separator + nodeStores[p];
+ File ioDeviceDir = new File(iodevicePath);
+ ioDeviceDir.mkdirs();
+ if (p == 0) {
+ ncConfig1.ioDevices = iodevicePath;
+ } else {
+ ncConfig1.ioDevices += "," + iodevicePath;
+ }
+ }
+ ncConfig1.appNCMainClass = NCApplicationEntryPoint.class.getName();
+ NodeControllerService nodeControllerService = new NodeControllerService(ncConfig1);
+ ncs[n] = nodeControllerService;
+ Thread ncStartThread = new Thread() {
+ @Override
+ public void run() {
+ try {
+ nodeControllerService.start();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ };
+ ncStartThread.start();
+ ++n;
+ }
+ hcc = new HyracksConnection(cc.getConfig().clientNetIpAddress, cc.getConfig().clientNetPort);
+ }
+
+ public static String[] getNcNames() {
+ return propertiesAccessor.getNodeNames().toArray(new String[propertiesAccessor.getNodeNames().size()]);
+ }
+
+ public static IHyracksClientConnection getHyracksClientConnection() {
+ return hcc;
+ }
+
+ public static void deinit(boolean deleteOldInstanceData) throws Exception {
++ //stop NCs
++ ArrayList<Thread> stopNCThreads = new ArrayList<>();
+ for (int n = 0; n < ncs.length; ++n) {
- if (ncs[n] != null)
- ncs[n].stop();
++ NodeControllerService nodeControllerService = ncs[n];
++ if (nodeControllerService != null) {
++ Thread ncStopThread = new Thread() {
++ @Override
++ public void run() {
++ try {
++ nodeControllerService.stop();
++ } catch (Exception e) {
++ e.printStackTrace();
++ }
++ }
++ };
++ stopNCThreads.add(ncStopThread);
++ ncStopThread.start();
++ }
++ }
+
++ //make sure all NCs stopped
++ for (Thread stopNcTheard : stopNCThreads) {
++ stopNcTheard.join();
+ }
++
+ if (cc != null) {
+ cc.stop();
+ }
+
+ if (deleteOldInstanceData) {
+ deleteTransactionLogs();
+ removeTestStorageFiles();
+ }
+ }
+
+ public static void runJob(JobSpecification spec) throws Exception {
+ GlobalConfig.ASTERIX_LOGGER.info(spec.toJSON().toString());
+ JobId jobId = hcc.startJob(spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
+ GlobalConfig.ASTERIX_LOGGER.info(jobId.toString());
+ hcc.waitForCompletion(jobId);
+ }
+
+ public static void removeTestStorageFiles() {
+ File dir = new File(System.getProperty(IO_DIR_KEY));
+ for (String ncName : propertiesAccessor.getNodeNames()) {
+ File ncDir = new File(dir, ncName);
+ FileUtils.deleteQuietly(ncDir);
+ }
+ }
+
+ private static void deleteTransactionLogs() throws Exception {
+ for (String ncId : propertiesAccessor.getNodeNames()) {
+ File log = new File(propertiesAccessor.getTransactionLogDirs().get(ncId));
+ if (log.exists()) {
+ FileUtils.deleteDirectory(log);
+ }
+ }
+ }
+
+ /**
+ * main method to run a simple 2 node cluster in-process
+ * suggested VM arguments: <code>-enableassertions -Xmx2048m -Dfile.encoding=UTF-8</code>
+ *
+ * @param args
+ * unused
+ */
+ public static void main(String[] args) {
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ try {
+ deinit(false);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ try {
+ System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, "asterix-build-configuration.xml");
+
+ init(false);
+ while (true) {
+ Thread.sleep(10000);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ System.exit(1);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java
index 5cd490a,0000000..d8f1893
mode 100644,000000..100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java
@@@ -1,273 -1,0 +1,273 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.external;
+
+import java.util.Collection;
+import java.util.List;
++import java.util.Set;
++import java.util.TreeSet;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.utils.StoragePathUtil;
+import org.apache.asterix.external.api.IAdapterFactory;
+import org.apache.asterix.external.feed.api.IFeedJoint;
+import org.apache.asterix.external.feed.api.IFeedMessage;
+import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.asterix.external.feed.message.EndFeedMessage;
+import org.apache.asterix.external.feed.message.FeedTupleCommitResponseMessage;
+import org.apache.asterix.external.feed.message.PrepareStallMessage;
+import org.apache.asterix.external.feed.message.TerminateDataFlowMessage;
+import org.apache.asterix.external.feed.message.ThrottlingEnabledFeedMessage;
+import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
+import org.apache.asterix.external.feed.watch.FeedConnectJobInfo;
+import org.apache.asterix.external.operators.FeedMessageOperatorDescriptor;
+import org.apache.asterix.external.util.FeedConstants;
+import org.apache.asterix.external.util.FeedUtils;
+import org.apache.asterix.file.JobSpecificationUtils;
+import org.apache.asterix.metadata.declared.AqlMetadataProvider;
+import org.apache.asterix.metadata.entities.Feed;
+import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.common.utils.Triple;
+import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import org.apache.hyracks.dataflow.std.file.FileRemoveOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.file.FileSplit;
+import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
+import org.apache.hyracks.dataflow.std.misc.NullSinkOperatorDescriptor;
+
+/**
+ * Provides helper method(s) for creating JobSpec for operations on a feed.
+ */
+public class FeedOperations {
+
+ /**
+ * Builds the job spec for ingesting a (primary) feed from its external source via the feed adaptor.
++ *
+ * @param primaryFeed
+ * @param metadataProvider
+ * @return JobSpecification the Hyracks job specification for receiving data from external source
+ * @throws Exception
+ */
+ public static Pair<JobSpecification, IAdapterFactory> buildFeedIntakeJobSpec(Feed primaryFeed,
+ AqlMetadataProvider metadataProvider, FeedPolicyAccessor policyAccessor) throws Exception {
-
+ JobSpecification spec = JobSpecificationUtils.createJobSpecification();
+ spec.setFrameSize(FeedConstants.JobConstants.DEFAULT_FRAME_SIZE);
+ IAdapterFactory adapterFactory = null;
+ IOperatorDescriptor feedIngestor;
+ AlgebricksPartitionConstraint ingesterPc;
-
- try {
- Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, IAdapterFactory> t = metadataProvider
- .buildFeedIntakeRuntime(spec, primaryFeed, policyAccessor);
- feedIngestor = t.first;
- ingesterPc = t.second;
- adapterFactory = t.third;
- } catch (AlgebricksException e) {
- e.printStackTrace();
- throw new AsterixException(e);
- }
-
++ Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, IAdapterFactory> t = metadataProvider
++ .buildFeedIntakeRuntime(spec, primaryFeed, policyAccessor);
++ feedIngestor = t.first;
++ ingesterPc = t.second;
++ adapterFactory = t.third;
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, feedIngestor, ingesterPc);
-
+ NullSinkOperatorDescriptor nullSink = new NullSinkOperatorDescriptor(spec);
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, nullSink, ingesterPc);
+ spec.connect(new OneToOneConnectorDescriptor(spec), feedIngestor, 0, nullSink, 0);
+ spec.addRoot(nullSink);
+ return new Pair<JobSpecification, IAdapterFactory>(spec, adapterFactory);
+ }
+
+ public static JobSpecification buildDiscontinueFeedSourceSpec(AqlMetadataProvider metadataProvider, FeedId feedId)
+ throws AsterixException, AlgebricksException {
+
+ JobSpecification spec = JobSpecificationUtils.createJobSpecification();
+ IOperatorDescriptor feedMessenger = null;
+ AlgebricksPartitionConstraint messengerPc = null;
+
+ List<String> locations = FeedLifecycleListener.INSTANCE.getIntakeLocations(feedId);
+ Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p = buildDiscontinueFeedMessengerRuntime(spec, feedId,
+ locations);
+
+ feedMessenger = p.first;
+ messengerPc = p.second;
+
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, feedMessenger, messengerPc);
+ NullSinkOperatorDescriptor nullSink = new NullSinkOperatorDescriptor(spec);
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, nullSink, messengerPc);
+ spec.connect(new OneToOneConnectorDescriptor(spec), feedMessenger, 0, nullSink, 0);
+ spec.addRoot(nullSink);
+
+ return spec;
+ }
+
+ /**
+ * Builds the job spec for sending message to an active feed to disconnect it from the
+ * its source.
+ */
+ public static Pair<JobSpecification, Boolean> buildDisconnectFeedJobSpec(AqlMetadataProvider metadataProvider,
+ FeedConnectionId connectionId) throws AsterixException, AlgebricksException {
+
+ JobSpecification spec = JobSpecificationUtils.createJobSpecification();
+ IOperatorDescriptor feedMessenger;
+ AlgebricksPartitionConstraint messengerPc;
+ List<String> locations = null;
+ FeedRuntimeType sourceRuntimeType;
+ try {
+ FeedConnectJobInfo cInfo = FeedLifecycleListener.INSTANCE.getFeedConnectJobInfo(connectionId);
+ IFeedJoint sourceFeedJoint = cInfo.getSourceFeedJoint();
+ IFeedJoint computeFeedJoint = cInfo.getComputeFeedJoint();
+
+ boolean terminateIntakeJob = false;
+ boolean completeDisconnect = computeFeedJoint == null || computeFeedJoint.getReceivers().isEmpty();
+ if (completeDisconnect) {
+ sourceRuntimeType = FeedRuntimeType.INTAKE;
+ locations = cInfo.getCollectLocations();
+ terminateIntakeJob = sourceFeedJoint.getReceivers().size() == 1;
+ } else {
+ locations = cInfo.getComputeLocations();
+ sourceRuntimeType = FeedRuntimeType.COMPUTE;
+ }
+
+ Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p = buildDisconnectFeedMessengerRuntime(spec,
+ connectionId, locations, sourceRuntimeType, completeDisconnect, sourceFeedJoint.getOwnerFeedId());
+
+ feedMessenger = p.first;
+ messengerPc = p.second;
+
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, feedMessenger, messengerPc);
+ NullSinkOperatorDescriptor nullSink = new NullSinkOperatorDescriptor(spec);
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, nullSink, messengerPc);
+ spec.connect(new OneToOneConnectorDescriptor(spec), feedMessenger, 0, nullSink, 0);
+ spec.addRoot(nullSink);
+ return new Pair<JobSpecification, Boolean>(spec, terminateIntakeJob);
+
+ } catch (AlgebricksException e) {
+ throw new AsterixException(e);
+ }
+
+ }
+
+ public static JobSpecification buildPrepareStallMessageJob(PrepareStallMessage stallMessage,
+ Collection<String> collectLocations) throws AsterixException {
+ JobSpecification messageJobSpec = JobSpecificationUtils.createJobSpecification();
+ try {
+ Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p = FeedOperations.buildSendFeedMessageRuntime(
+ messageJobSpec, stallMessage.getConnectionId(), stallMessage, collectLocations);
+ buildSendFeedMessageJobSpec(p.first, p.second, messageJobSpec);
+ } catch (AlgebricksException ae) {
+ throw new AsterixException(ae);
+ }
+ return messageJobSpec;
+ }
+
+ public static JobSpecification buildNotifyThrottlingEnabledMessageJob(
+ ThrottlingEnabledFeedMessage throttlingEnabledMesg, Collection<String> locations) throws AsterixException {
+ JobSpecification messageJobSpec = JobSpecificationUtils.createJobSpecification();
+ try {
+ Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p = FeedOperations.buildSendFeedMessageRuntime(
+ messageJobSpec, throttlingEnabledMesg.getConnectionId(), throttlingEnabledMesg, locations);
+ buildSendFeedMessageJobSpec(p.first, p.second, messageJobSpec);
+ } catch (AlgebricksException ae) {
+ throw new AsterixException(ae);
+ }
+ return messageJobSpec;
+ }
+
+ public static JobSpecification buildTerminateFlowMessageJob(TerminateDataFlowMessage terminateMessage,
+ List<String> collectLocations) throws AsterixException {
+ JobSpecification messageJobSpec = JobSpecificationUtils.createJobSpecification();
+ try {
+ Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p = FeedOperations.buildSendFeedMessageRuntime(
+ messageJobSpec, terminateMessage.getConnectionId(), terminateMessage, collectLocations);
+ buildSendFeedMessageJobSpec(p.first, p.second, messageJobSpec);
+ } catch (AlgebricksException ae) {
+ throw new AsterixException(ae);
+ }
+ return messageJobSpec;
+ }
+
+ public static JobSpecification buildCommitAckResponseJob(FeedTupleCommitResponseMessage commitResponseMessage,
+ Collection<String> targetLocations) throws AsterixException {
+ JobSpecification messageJobSpec = JobSpecificationUtils.createJobSpecification();
+ try {
+ Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p = FeedOperations.buildSendFeedMessageRuntime(
+ messageJobSpec, commitResponseMessage.getConnectionId(), commitResponseMessage, targetLocations);
+ buildSendFeedMessageJobSpec(p.first, p.second, messageJobSpec);
+ } catch (AlgebricksException ae) {
+ throw new AsterixException(ae);
+ }
+ return messageJobSpec;
+ }
+
+ public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildDiscontinueFeedMessengerRuntime(
+ JobSpecification jobSpec, FeedId feedId, List<String> locations) throws AlgebricksException {
+ FeedConnectionId feedConnectionId = new FeedConnectionId(feedId, null);
+ IFeedMessage feedMessage = new EndFeedMessage(feedConnectionId, FeedRuntimeType.INTAKE,
+ feedConnectionId.getFeedId(), true, EndFeedMessage.EndMessageType.DISCONTINUE_SOURCE);
+ return buildSendFeedMessageRuntime(jobSpec, feedConnectionId, feedMessage, locations);
+ }
+
+ private static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildSendFeedMessageRuntime(
+ JobSpecification jobSpec, FeedConnectionId feedConenctionId, IFeedMessage feedMessage,
+ Collection<String> locations) throws AlgebricksException {
+ AlgebricksPartitionConstraint partitionConstraint = new AlgebricksAbsolutePartitionConstraint(
+ locations.toArray(new String[] {}));
+ FeedMessageOperatorDescriptor feedMessenger = new FeedMessageOperatorDescriptor(jobSpec, feedConenctionId,
+ feedMessage);
+ return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(feedMessenger, partitionConstraint);
+ }
+
+ private static JobSpecification buildSendFeedMessageJobSpec(IOperatorDescriptor operatorDescriptor,
+ AlgebricksPartitionConstraint messengerPc, JobSpecification messageJobSpec) {
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(messageJobSpec, operatorDescriptor,
+ messengerPc);
+ NullSinkOperatorDescriptor nullSink = new NullSinkOperatorDescriptor(messageJobSpec);
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(messageJobSpec, nullSink, messengerPc);
+ messageJobSpec.connect(new OneToOneConnectorDescriptor(messageJobSpec), operatorDescriptor, 0, nullSink, 0);
+ messageJobSpec.addRoot(nullSink);
+ return messageJobSpec;
+ }
+
+ private static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildDisconnectFeedMessengerRuntime(
+ JobSpecification jobSpec, FeedConnectionId feedConenctionId, List<String> locations,
+ FeedRuntimeType sourceFeedRuntimeType, boolean completeDisconnection, FeedId sourceFeedId)
- throws AlgebricksException {
++ throws AlgebricksException {
+ IFeedMessage feedMessage = new EndFeedMessage(feedConenctionId, sourceFeedRuntimeType, sourceFeedId,
+ completeDisconnection, EndFeedMessage.EndMessageType.DISCONNECT_FEED);
+ return buildSendFeedMessageRuntime(jobSpec, feedConenctionId, feedMessage, locations);
+ }
+
+ public static JobSpecification buildRemoveFeedStorageJob(Feed feed) throws Exception {
+ JobSpecification spec = JobSpecificationUtils.createJobSpecification();
- AlgebricksAbsolutePartitionConstraint locations = AsterixClusterProperties.INSTANCE.getClusterLocations();
++ AlgebricksAbsolutePartitionConstraint allCluster = AsterixClusterProperties.INSTANCE.getClusterLocations();
++ Set<String> nodes = new TreeSet<>();
++ for (String node : allCluster.getLocations()) {
++ nodes.add(node);
++ }
++ AlgebricksAbsolutePartitionConstraint locations = new AlgebricksAbsolutePartitionConstraint(
++ nodes.toArray(new String[nodes.size()]));
+ FileSplit[] feedLogFileSplits = FeedUtils.splitsForAdapter(feed.getDataverseName(), feed.getFeedName(),
+ locations);
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = StoragePathUtil
+ .splitProviderAndPartitionConstraints(feedLogFileSplits);
- FileRemoveOperatorDescriptor frod = new FileRemoveOperatorDescriptor(spec, splitsAndConstraint.first);
++ FileRemoveOperatorDescriptor frod = new FileRemoveOperatorDescriptor(spec, splitsAndConstraint.first, true);
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, frod, splitsAndConstraint.second);
+ spec.addRoot(frod);
+ return spec;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/DatasetOperations.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-app/src/main/java/org/apache/asterix/file/DatasetOperations.java
index 06d2b71,0000000..9052696
mode 100644,000000..100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/DatasetOperations.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/DatasetOperations.java
@@@ -1,265 -1,0 +1,261 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.file;
+
+import java.io.File;
+import java.rmi.RemoteException;
+import java.util.Map;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.ILocalResourceMetadata;
+import org.apache.asterix.common.config.AsterixStorageProperties;
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.context.AsterixVirtualBufferCacheProvider;
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
+import org.apache.asterix.formats.base.IDataFormat;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.declared.AqlMetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.Dataverse;
+import org.apache.asterix.metadata.utils.DatasetUtils;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.util.AsterixAppContextInfo;
+import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerProvider;
+import org.apache.asterix.transaction.management.resource.LSMBTreeLocalResourceMetadata;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
+import org.apache.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
+import org.apache.asterix.translator.CompiledStatements.CompiledDatasetDropStatement;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.std.file.FileSplit;
+import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
+import org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor;
+import org.apache.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor;
+import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import org.apache.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMTreeIndexCompactOperatorDescriptor;
+import org.apache.hyracks.storage.common.file.ILocalResourceFactoryProvider;
+import org.apache.hyracks.storage.common.file.LocalResource;
+
+public class DatasetOperations {
+
+ private static Logger LOGGER = Logger.getLogger(DatasetOperations.class.getName());
+
+ public static JobSpecification createDropDatasetJobSpec(CompiledDatasetDropStatement datasetDropStmt,
+ AqlMetadataProvider metadataProvider)
- throws AlgebricksException, HyracksDataException, RemoteException, ACIDException, AsterixException {
++ throws AlgebricksException, HyracksDataException, RemoteException, ACIDException, AsterixException {
+
+ String dataverseName = null;
+ if (datasetDropStmt.getDataverseName() != null) {
+ dataverseName = datasetDropStmt.getDataverseName();
+ } else if (metadataProvider.getDefaultDataverse() != null) {
+ dataverseName = metadataProvider.getDefaultDataverse().getDataverseName();
+ }
+
+ String datasetName = datasetDropStmt.getDatasetName();
+ String datasetPath = dataverseName + File.separator + datasetName;
+
+ LOGGER.info("DROP DATASETPATH: " + datasetPath);
+
+ Dataset dataset = metadataProvider.findDataset(dataverseName, datasetName);
+ if (dataset == null) {
+ throw new AlgebricksException("DROP DATASET: No metadata for dataset " + datasetName);
+ }
+ if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
+ return JobSpecificationUtils.createJobSpecification();
+ }
+ boolean temp = dataset.getDatasetDetails().isTemp();
+
+ Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(),
+ dataverseName);
+ IDataFormat format;
+ try {
+ format = (IDataFormat) Class.forName(dataverse.getDataFormat()).newInstance();
+ } catch (Exception e) {
+ throw new AsterixException(e);
+ }
+
+ ARecordType itemType = (ARecordType) metadataProvider.findType(dataset.getItemTypeDataverseName(),
+ dataset.getItemTypeName());
+
+ ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, itemType);
+ IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
+ itemType, format.getBinaryComparatorFactoryProvider());
+ int[] filterFields = DatasetUtils.createFilterFields(dataset);
+ int[] btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset);
+ JobSpecification specPrimary = JobSpecificationUtils.createJobSpecification();
+
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadataProvider
+ .splitProviderAndPartitionConstraintsForDataset(dataset.getDataverseName(), datasetName, datasetName,
+ temp);
+ AsterixStorageProperties storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
+ Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(dataset,
+ metadataProvider.getMetadataTxnContext());
+
+ IndexDropOperatorDescriptor primaryBtreeDrop = new IndexDropOperatorDescriptor(specPrimary,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ splitsAndConstraint.first,
+ new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
+ compactionInfo.first, compactionInfo.second,
+ new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
+ storageProperties.getBloomFilterFalsePositiveRate(), true, filterTypeTraits, filterCmpFactories,
+ btreeFields, filterFields, !temp));
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(specPrimary, primaryBtreeDrop,
+ splitsAndConstraint.second);
+
+ specPrimary.addRoot(primaryBtreeDrop);
+
+ return specPrimary;
+ }
+
+ public static JobSpecification createDatasetJobSpec(Dataverse dataverse, String datasetName,
+ AqlMetadataProvider metadata) throws AsterixException, AlgebricksException {
+ String dataverseName = dataverse.getDataverseName();
+ IDataFormat format;
+ try {
+ format = (IDataFormat) Class.forName(dataverse.getDataFormat()).newInstance();
+ } catch (Exception e) {
+ throw new AsterixException(e);
+ }
+ Dataset dataset = metadata.findDataset(dataverseName, datasetName);
+ if (dataset == null) {
+ throw new AsterixException("Could not find dataset " + datasetName + " in dataverse " + dataverseName);
+ }
+ boolean temp = dataset.getDatasetDetails().isTemp();
+ ARecordType itemType = (ARecordType) metadata.findType(dataset.getItemTypeDataverseName(),
+ dataset.getItemTypeName());
+ // get meta item type
+ ARecordType metaItemType = null;
+ if (dataset.hasMetaPart()) {
+ metaItemType = (ARecordType) metadata.findType(dataset.getMetaItemTypeDataverseName(),
+ dataset.getMetaItemTypeName());
+ }
+ JobSpecification spec = JobSpecificationUtils.createJobSpecification();
+ IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset,
+ itemType, format.getBinaryComparatorFactoryProvider());
+ ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType, metaItemType);
+ int[] bloomFilterKeyFields = DatasetUtils.createBloomFilterKeyFields(dataset);
+
+ ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, itemType);
+ IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
+ itemType, format.getBinaryComparatorFactoryProvider());
+ int[] filterFields = DatasetUtils.createFilterFields(dataset);
+ int[] btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset);
+
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadata
+ .splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, datasetName, temp);
+ FileSplit[] fs = splitsAndConstraint.first.getFileSplits();
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < fs.length; i++) {
+ sb.append(stringOf(fs[i]) + " ");
+ }
+ LOGGER.info("CREATING File Splits: " + sb.toString());
+
+ Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(dataset,
+ metadata.getMetadataTxnContext());
+ AsterixStorageProperties storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
+ //prepare a LocalResourceMetadata which will be stored in NC's local resource repository
+ ILocalResourceMetadata localResourceMetadata = new LSMBTreeLocalResourceMetadata(typeTraits,
+ comparatorFactories, bloomFilterKeyFields, true, dataset.getDatasetId(), compactionInfo.first,
+ compactionInfo.second, filterTypeTraits, filterCmpFactories, btreeFields, filterFields);
+ ILocalResourceFactoryProvider localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider(
+ localResourceMetadata, LocalResource.LSMBTreeResource);
+
+ TreeIndexCreateOperatorDescriptor indexCreateOp = new TreeIndexCreateOperatorDescriptor(spec,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields,
+ new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
+ compactionInfo.first, compactionInfo.second,
+ new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
+ storageProperties.getBloomFilterFalsePositiveRate(), true, filterTypeTraits, filterCmpFactories,
+ btreeFields, filterFields, !temp),
+ localResourceFactoryProvider, NoOpOperationCallbackFactory.INSTANCE);
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, indexCreateOp,
+ splitsAndConstraint.second);
+ spec.addRoot(indexCreateOp);
+ return spec;
+ }
+
+ private static String stringOf(FileSplit fs) {
+ return fs.getNodeName() + ":" + fs.getLocalFile().toString();
+ }
+
+ public static JobSpecification compactDatasetJobSpec(Dataverse dataverse, String datasetName,
+ AqlMetadataProvider metadata) throws AsterixException, AlgebricksException {
+ String dataverseName = dataverse.getDataverseName();
+ IDataFormat format;
+ try {
+ format = (IDataFormat) Class.forName(dataverse.getDataFormat()).newInstance();
+ } catch (Exception e) {
+ throw new AsterixException(e);
+ }
+ Dataset dataset = metadata.findDataset(dataverseName, datasetName);
+ if (dataset == null) {
+ throw new AsterixException("Could not find dataset " + datasetName + " in dataverse " + dataverseName);
+ }
+ boolean temp = dataset.getDatasetDetails().isTemp();
-
+ ARecordType itemType = (ARecordType) metadata.findType(dataset.getItemTypeDataverseName(),
+ dataset.getItemTypeName());
++ ARecordType metaItemType = DatasetUtils.getMetaType(metadata, dataset);
+ JobSpecification spec = JobSpecificationUtils.createJobSpecification();
+ IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset,
+ itemType, format.getBinaryComparatorFactoryProvider());
- ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType);
++ ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType, metaItemType);
+ int[] blooFilterKeyFields = DatasetUtils.createBloomFilterKeyFields(dataset);
-
+ ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, itemType);
+ IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
+ itemType, format.getBinaryComparatorFactoryProvider());
+ int[] filterFields = DatasetUtils.createFilterFields(dataset);
+ int[] btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset);
-
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadata
+ .splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, datasetName, temp);
-
+ AsterixStorageProperties storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
-
+ Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(dataset,
+ metadata.getMetadataTxnContext());
+ LSMTreeIndexCompactOperatorDescriptor compactOp = new LSMTreeIndexCompactOperatorDescriptor(spec,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ splitsAndConstraint.first, typeTraits, comparatorFactories, blooFilterKeyFields,
+ new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
+ compactionInfo.first, compactionInfo.second,
+ new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
+ storageProperties.getBloomFilterFalsePositiveRate(), true, filterTypeTraits, filterCmpFactories,
+ btreeFields, filterFields, !temp),
+ NoOpOperationCallbackFactory.INSTANCE);
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, compactOp,
+ splitsAndConstraint.second);
+
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, compactOp,
+ splitsAndConstraint.second);
+ spec.addRoot(compactOp);
+ return spec;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/DataverseOperations.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-app/src/main/java/org/apache/asterix/file/DataverseOperations.java
index c77ca10,0000000..d5765f1
mode 100644,000000..100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/DataverseOperations.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/DataverseOperations.java
@@@ -1,40 -1,0 +1,40 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.file;
+
+import org.apache.asterix.metadata.declared.AqlMetadataProvider;
+import org.apache.asterix.metadata.entities.Dataverse;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.std.file.FileRemoveOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
+
+public class DataverseOperations {
+ public static JobSpecification createDropDataverseJobSpec(Dataverse dataverse, AqlMetadataProvider metadata) {
+ JobSpecification jobSpec = JobSpecificationUtils.createJobSpecification();
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadata
+ .splitProviderAndPartitionConstraintsForDataverse(dataverse.getDataverseName());
- FileRemoveOperatorDescriptor frod = new FileRemoveOperatorDescriptor(jobSpec, splitsAndConstraint.first);
++ FileRemoveOperatorDescriptor frod = new FileRemoveOperatorDescriptor(jobSpec, splitsAndConstraint.first, false);
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(jobSpec, frod, splitsAndConstraint.second);
+ jobSpec.addRoot(frod);
+ return jobSpec;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
index 0a0a917,0000000..13b0189
mode 100644,000000..100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
@@@ -1,212 -1,0 +1,215 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.messaging;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.common.messaging.CompleteFailbackRequestMessage;
+import org.apache.asterix.common.messaging.CompleteFailbackResponseMessage;
+import org.apache.asterix.common.messaging.PreparePartitionsFailbackRequestMessage;
+import org.apache.asterix.common.messaging.PreparePartitionsFailbackResponseMessage;
+import org.apache.asterix.common.messaging.ReplicaEventMessage;
+import org.apache.asterix.common.messaging.ReportMaxResourceIdMessage;
+import org.apache.asterix.common.messaging.TakeoverMetadataNodeResponseMessage;
+import org.apache.asterix.common.messaging.TakeoverPartitionsRequestMessage;
+import org.apache.asterix.common.messaging.TakeoverPartitionsResponseMessage;
+import org.apache.asterix.common.messaging.api.IApplicationMessage;
+import org.apache.asterix.common.messaging.api.IApplicationMessageCallback;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.asterix.common.replication.IRemoteRecoveryManager;
+import org.apache.asterix.common.replication.Replica;
+import org.apache.asterix.common.replication.ReplicaEvent;
+import org.apache.asterix.event.schema.cluster.Node;
+import org.apache.asterix.metadata.bootstrap.MetadataIndexImmutableProperties;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
+import org.apache.hyracks.api.messages.IMessage;
+import org.apache.hyracks.api.util.JavaSerializationUtils;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class NCMessageBroker implements INCMessageBroker {
+ private final static Logger LOGGER = Logger.getLogger(NCMessageBroker.class.getName());
+
+ private final NodeControllerService ncs;
+ private final AtomicLong messageId = new AtomicLong(0);
+ private final Map<Long, IApplicationMessageCallback> callbacks;
+ private final IAsterixAppRuntimeContext appContext;
+
+ public NCMessageBroker(NodeControllerService ncs) {
+ this.ncs = ncs;
+ appContext = (IAsterixAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
+ callbacks = new ConcurrentHashMap<Long, IApplicationMessageCallback>();
+ }
+
+ @Override
+ public void sendMessage(IApplicationMessage message, IApplicationMessageCallback callback) throws Exception {
+ if (callback != null) {
+ long uniqueMessageId = messageId.incrementAndGet();
+ message.setId(uniqueMessageId);
+ callbacks.put(uniqueMessageId, callback);
+ }
+ try {
+ ncs.sendApplicationMessageToCC(JavaSerializationUtils.serialize(message), null);
+ } catch (Exception e) {
+ if (callback != null) {
+ //remove the callback in case of failure
+ callbacks.remove(message.getId());
+ }
+ throw e;
+ }
+ }
+
+ @Override
+ public void receivedMessage(IMessage message, String nodeId) throws Exception {
+ try {
+ AbstractApplicationMessage absMessage = (AbstractApplicationMessage) message;
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Received message: " + absMessage.getMessageType().name());
+ }
+ //if the received message is a response to a sent message, deliver it to the sender
+ IApplicationMessageCallback callback = callbacks.remove(absMessage.getId());
+ if (callback != null) {
+ callback.deliverMessageResponse(absMessage);
+ }
+
+ //handle requests from CC
+ switch (absMessage.getMessageType()) {
+ case REPORT_MAX_RESOURCE_ID_REQUEST:
+ reportMaxResourceId();
+ break;
+ case TAKEOVER_PARTITIONS_REQUEST:
+ handleTakeoverPartitons(message);
+ break;
+ case TAKEOVER_METADATA_NODE_REQUEST:
+ handleTakeoverMetadataNode(message);
+ break;
+ case PREPARE_PARTITIONS_FAILBACK_REQUEST:
+ handlePreparePartitionsFailback(message);
+ break;
+ case COMPLETE_FAILBACK_REQUEST:
+ handleCompleteFailbackRequest(message);
+ break;
+ case REPLICA_EVENT:
+ handleReplicaEvent(message);
+ break;
+ default:
+ break;
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw e;
+ }
+ }
+
+ private void handleTakeoverPartitons(IMessage message) throws Exception {
+ TakeoverPartitionsRequestMessage msg = (TakeoverPartitionsRequestMessage) message;
- try {
- IRemoteRecoveryManager remoteRecoeryManager = appContext.getRemoteRecoveryManager();
- remoteRecoeryManager.takeoverPartitons(msg.getPartitions());
- } finally {
- //send response after takeover is completed
- TakeoverPartitionsResponseMessage reponse = new TakeoverPartitionsResponseMessage(msg.getRequestId(),
- appContext.getTransactionSubsystem().getId(), msg.getPartitions());
- sendMessage(reponse, null);
++ //if the NC is shutting down, it should ignore takeover partitions request
++ if (!appContext.isShuttingdown()) {
++ try {
++ IRemoteRecoveryManager remoteRecoeryManager = appContext.getRemoteRecoveryManager();
++ remoteRecoeryManager.takeoverPartitons(msg.getPartitions());
++ } finally {
++ //send response after takeover is completed
++ TakeoverPartitionsResponseMessage reponse = new TakeoverPartitionsResponseMessage(msg.getRequestId(),
++ appContext.getTransactionSubsystem().getId(), msg.getPartitions());
++ sendMessage(reponse, null);
++ }
+ }
+ }
+
+ private void handleTakeoverMetadataNode(IMessage message) throws Exception {
+ try {
+ appContext.initializeMetadata(false);
+ appContext.exportMetadataNodeStub();
+ } finally {
+ TakeoverMetadataNodeResponseMessage reponse = new TakeoverMetadataNodeResponseMessage(
+ appContext.getTransactionSubsystem().getId());
+ sendMessage(reponse, null);
+ }
+ }
+
+ @Override
+ public void reportMaxResourceId() throws Exception {
+ ReportMaxResourceIdMessage maxResourceIdMsg = new ReportMaxResourceIdMessage();
+ //resource ids < FIRST_AVAILABLE_USER_DATASET_ID are reserved for metadata indexes.
+ long maxResourceId = Math.max(appContext.getLocalResourceRepository().getMaxResourceID(),
+ MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID);
+ maxResourceIdMsg.setMaxResourceId(maxResourceId);
+ sendMessage(maxResourceIdMsg, null);
+ }
+
+ private void handleReplicaEvent(IMessage message) {
+ ReplicaEventMessage msg = (ReplicaEventMessage) message;
+ Node node = new Node();
+ node.setId(msg.getNodeId());
+ node.setClusterIp(msg.getNodeIPAddress());
+ Replica replica = new Replica(node);
+ ReplicaEvent event = new ReplicaEvent(replica, msg.getEvent());
+ appContext.getReplicationManager().reportReplicaEvent(event);
+ }
+
+ private void handlePreparePartitionsFailback(IMessage message) throws Exception {
+ PreparePartitionsFailbackRequestMessage msg = (PreparePartitionsFailbackRequestMessage) message;
+ /**
+ * if the metadata partition will be failed back
+ * we need to flush and close all datasets including metadata datasets
+ * otherwise we need to close all non-metadata datasets and flush metadata datasets
+ * so that their memory components will be copied to the failing back node
+ */
+ if (msg.isReleaseMetadataNode()) {
+ appContext.getDatasetLifecycleManager().closeAllDatasets();
+ //remove the metadata node stub from RMI registry
+ appContext.unexportMetadataNodeStub();
+ } else {
+ //close all non-metadata datasets
+ appContext.getDatasetLifecycleManager().closeUserDatasets();
+ //flush the remaining metadata datasets that were not closed
+ appContext.getDatasetLifecycleManager().flushAllDatasets();
+ }
+
+ //mark the partitions to be closed as inactive
+ PersistentLocalResourceRepository localResourceRepo = (PersistentLocalResourceRepository) appContext
+ .getLocalResourceRepository();
+ for (Integer partitionId : msg.getPartitions()) {
+ localResourceRepo.addInactivePartition(partitionId);
+ }
+
+ //send response after partitions prepared for failback
+ PreparePartitionsFailbackResponseMessage reponse = new PreparePartitionsFailbackResponseMessage(msg.getPlanId(),
+ msg.getRequestId(), msg.getPartitions());
+ sendMessage(reponse, null);
+ }
+
+ private void handleCompleteFailbackRequest(IMessage message) throws Exception {
+ CompleteFailbackRequestMessage msg = (CompleteFailbackRequestMessage) message;
+ try {
+ IRemoteRecoveryManager remoteRecoeryManager = appContext.getRemoteRecoveryManager();
+ remoteRecoeryManager.completeFailbackProcess();
+ } finally {
+ CompleteFailbackResponseMessage reponse = new CompleteFailbackResponseMessage(msg.getPlanId(),
+ msg.getRequestId(), msg.getPartitions());
+ sendMessage(reponse, null);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTest.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTest.java
index 8d020e7,0000000..e372d31
mode 100644,000000..100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTest.java
@@@ -1,106 -1,0 +1,107 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.runtime;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.logging.Logger;
+
+import org.apache.asterix.app.external.TestLibrarian;
+import org.apache.asterix.common.config.AsterixTransactionProperties;
+import org.apache.asterix.test.aql.TestExecutor;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.commons.lang3.StringUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Runs the runtime test cases under 'asterix-app/src/test/resources/runtimets'.
+ */
+@RunWith(Parameterized.class)
+public class ExecutionTest {
+
+ protected static final Logger LOGGER = Logger.getLogger(ExecutionTest.class.getName());
+
+ protected static final String PATH_ACTUAL = "rttest" + File.separator;
+ protected static final String PATH_BASE = StringUtils.join(new String[] { "src", "test", "resources", "runtimets" },
+ File.separator);
+
+ protected static final String TEST_CONFIG_FILE_NAME = "asterix-build-configuration.xml";
-
+ protected static AsterixTransactionProperties txnProperties;
- private final static TestExecutor testExecutor = new TestExecutor();
++ private static final TestExecutor testExecutor = new TestExecutor();
++ private static final boolean cleanupOnStart = true;
++ private static final boolean cleanupOnStop = true;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ try {
+ File outdir = new File(PATH_ACTUAL);
+ outdir.mkdirs();
+ // remove library directory
+ TestLibrarian.removeLibraryDir();
+ testExecutor.setLibrarian(new TestLibrarian());
- ExecutionTestUtil.setUp();
++ ExecutionTestUtil.setUp(cleanupOnStart);
+ } catch (Throwable th) {
+ th.printStackTrace();
+ throw th;
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ // remove library directory
+ TestLibrarian.removeLibraryDir();
- ExecutionTestUtil.tearDown();
++ ExecutionTestUtil.tearDown(cleanupOnStop);
+ }
+
+ @Parameters(name = "ExecutionTest {index}: {0}")
+ public static Collection<Object[]> tests() throws Exception {
+ Collection<Object[]> testArgs = buildTestsInXml(TestCaseContext.ONLY_TESTSUITE_XML_NAME);
+ if (testArgs.size() == 0) {
+ testArgs = buildTestsInXml(TestCaseContext.DEFAULT_TESTSUITE_XML_NAME);
+ }
+ return testArgs;
+ }
+
+ protected static Collection<Object[]> buildTestsInXml(String xmlfile) throws Exception {
+ Collection<Object[]> testArgs = new ArrayList<Object[]>();
+ TestCaseContext.Builder b = new TestCaseContext.Builder();
+ for (TestCaseContext ctx : b.build(new File(PATH_BASE), xmlfile)) {
+ testArgs.add(new Object[] { ctx });
+ }
+ return testArgs;
+
+ }
+
+ protected TestCaseContext tcCtx;
+
+ public ExecutionTest(TestCaseContext tcCtx) {
+ this.tcCtx = tcCtx;
+ }
+
+ @Test
+ public void test() throws Exception {
+ testExecutor.executeTest(PATH_ACTUAL, tcCtx, null, false, ExecutionTestUtil.FailedGroup);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
index 5e76ecb,0000000..d919c92
mode 100644,000000..100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
@@@ -1,112 -1,0 +1,112 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.runtime;
+
+import java.io.File;
+import java.nio.file.Paths;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
+import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.IdentitiyResolverFactory;
+import org.apache.asterix.testframework.xml.TestGroup;
+import org.apache.asterix.testframework.xml.TestSuite;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.storage.common.buffercache.BufferCache;
+
+public class ExecutionTestUtil {
+
+ protected static final Logger LOGGER = Logger.getLogger(ExecutionTest.class.getName());
+
+ protected static final String PATH_ACTUAL = "rttest" + File.separator;
+
+ protected static final String TEST_CONFIG_FILE_NAME = "asterix-build-configuration.xml";
+
+ protected static TestGroup FailedGroup;
+
- public static void setUp() throws Exception {
++ public static void setUp(boolean cleanup) throws Exception {
+ System.out.println("Starting setup");
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Starting setup");
+ }
+ System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, TEST_CONFIG_FILE_NAME);
+
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("initializing pseudo cluster");
+ }
- AsterixHyracksIntegrationUtil.init(true);
++ AsterixHyracksIntegrationUtil.init(cleanup);
+
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("initializing HDFS");
+ }
+
+ HDFSCluster.getInstance().setup();
+
+ // Set the node resolver to be the identity resolver that expects node
+ // names
+ // to be node controller ids; a valid assumption in test environment.
+ System.setProperty(ExternalDataConstants.NODE_RESOLVER_FACTORY_PROPERTY,
+ IdentitiyResolverFactory.class.getName());
+
+ FailedGroup = new TestGroup();
+ FailedGroup.setName("failed");
+ }
+
+ private static void validateBufferCacheState() {
+ for (NodeControllerService nc : AsterixHyracksIntegrationUtil.ncs) {
+ IAsterixAppRuntimeContext appCtx = (IAsterixAppRuntimeContext) nc.getApplicationContext()
+ .getApplicationObject();
+ if (!((BufferCache) appCtx.getBufferCache()).isClean()) {
+ throw new IllegalStateException();
+ }
+ }
+ }
+
- public static void tearDown() throws Exception {
++ public static void tearDown(boolean cleanup) throws Exception {
+ // validateBufferCacheState(); <-- Commented out until bug is fixed -->
- AsterixHyracksIntegrationUtil.deinit(true);
++ AsterixHyracksIntegrationUtil.deinit(cleanup);
+ File outdir = new File(PATH_ACTUAL);
+ File[] files = outdir.listFiles();
+ if (files == null || files.length == 0) {
+ outdir.delete();
+ }
+ HDFSCluster.getInstance().cleanup();
+
+ if (FailedGroup != null && FailedGroup.getTestCase().size() > 0) {
+ File temp = File.createTempFile("failed", ".xml");
+ javax.xml.bind.JAXBContext jaxbCtx = null;
+ jaxbCtx = javax.xml.bind.JAXBContext.newInstance(TestSuite.class.getPackage().getName());
+ javax.xml.bind.Marshaller marshaller = null;
+ marshaller = jaxbCtx.createMarshaller();
+ marshaller.setProperty(javax.xml.bind.Marshaller.JAXB_FORMATTED_OUTPUT, Boolean.TRUE);
+ TestSuite failedSuite = new TestSuite();
+ failedSuite.setResultOffsetPath("results");
+ failedSuite.setQueryOffsetPath("queries");
+ failedSuite.getTestGroup().add(FailedGroup);
+ marshaller.marshal(failedSuite, temp);
+ System.err.println("The failed.xml is written to :" + temp.getAbsolutePath()
+ + ". You can copy it to only.xml by the following cmd:" + "\rcp " + temp.getAbsolutePath() + " "
+ + Paths.get("./src/test/resources/runtimets/only.xml").toAbsolutePath());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionTest.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionTest.java
index cbb14c5,0000000..b827a0d
mode 100644,000000..100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionTest.java
@@@ -1,100 -1,0 +1,102 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.runtime;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.logging.Logger;
+
+import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
+import org.apache.asterix.common.config.AsterixTransactionProperties;
+import org.apache.asterix.test.aql.TestExecutor;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.asterix.testframework.xml.TestGroup;
+import org.apache.commons.lang3.StringUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Runs the runtime test cases under 'asterix-app/src/test/resources/runtimets'.
+ */
+@RunWith(Parameterized.class)
+public class SqlppExecutionTest {
+
+ protected static final Logger LOGGER = Logger.getLogger(SqlppExecutionTest.class.getName());
+
+ protected static final String PATH_ACTUAL = "rttest" + File.separator;
+ protected static final String PATH_BASE = StringUtils.join(new String[] { "src", "test", "resources", "runtimets" },
+ File.separator);
+
+ protected static final String TEST_CONFIG_FILE_NAME = "asterix-build-configuration.xml";
+
+ protected static AsterixTransactionProperties txnProperties;
- private final static TestExecutor testExecutor = new TestExecutor();
++ private static final TestExecutor testExecutor = new TestExecutor();
++ private static final boolean cleanupOnStart = true;
++ private static final boolean cleanupOnStop = true;
+
+ protected static TestGroup FailedGroup;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ File outdir = new File(PATH_ACTUAL);
+ outdir.mkdirs();
- ExecutionTestUtil.setUp();
++ ExecutionTestUtil.setUp(cleanupOnStart);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
- ExecutionTestUtil.tearDown();
++ ExecutionTestUtil.tearDown(cleanupOnStop);
+ AsterixHyracksIntegrationUtil.removeTestStorageFiles();
+ }
+
+ @Parameters(name = "SqlppExecutionTest {index}: {0}")
+ public static Collection<Object[]> tests() throws Exception {
+ Collection<Object[]> testArgs = buildTestsInXml("only_sqlpp.xml");
+ if (testArgs.size() == 0) {
+ testArgs = buildTestsInXml("testsuite_sqlpp.xml");
+ }
+ return testArgs;
+ }
+
+ protected static Collection<Object[]> buildTestsInXml(String xmlfile) throws Exception {
+ Collection<Object[]> testArgs = new ArrayList<Object[]>();
+ TestCaseContext.Builder b = new TestCaseContext.Builder();
+ for (TestCaseContext ctx : b.build(new File(PATH_BASE), xmlfile)) {
+ testArgs.add(new Object[] { ctx });
+ }
+ return testArgs;
+
+ }
+
+ protected TestCaseContext tcCtx;
+
+ public SqlppExecutionTest(TestCaseContext tcCtx) {
+ this.tcCtx = tcCtx;
+ }
+
+ @Test
+ public void test() throws Exception {
+ testExecutor.executeTest(PATH_ACTUAL, tcCtx, null, false, FailedGroup);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/sqlpp/ParserTestExecutor.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-app/src/test/java/org/apache/asterix/test/sqlpp/ParserTestExecutor.java
index d6cf231,0000000..7f7fbb4
mode 100644,000000..100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/sqlpp/ParserTestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/sqlpp/ParserTestExecutor.java
@@@ -1,181 -1,0 +1,182 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.sqlpp;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.lang.common.base.IParser;
+import org.apache.asterix.lang.common.base.IParserFactory;
+import org.apache.asterix.lang.common.base.IQueryRewriter;
+import org.apache.asterix.lang.common.base.IRewriterFactory;
+import org.apache.asterix.lang.common.base.Statement;
+import org.apache.asterix.lang.common.base.Statement.Kind;
+import org.apache.asterix.lang.common.rewrites.LangRewritingContext;
+import org.apache.asterix.lang.common.statement.DataverseDecl;
+import org.apache.asterix.lang.common.statement.FunctionDecl;
+import org.apache.asterix.lang.common.statement.Query;
+import org.apache.asterix.lang.common.util.FunctionUtil;
+import org.apache.asterix.lang.sqlpp.parser.SqlppParserFactory;
+import org.apache.asterix.lang.sqlpp.rewrites.SqlppRewriterFactory;
+import org.apache.asterix.lang.sqlpp.util.SqlppAstPrintUtil;
+import org.apache.asterix.metadata.declared.AqlMetadataProvider;
+import org.apache.asterix.test.aql.TestExecutor;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.asterix.testframework.context.TestFileContext;
+import org.apache.asterix.testframework.xml.TestCase.CompilationUnit;
+import org.apache.asterix.testframework.xml.TestGroup;
+
+import junit.extensions.PA;
+
+public class ParserTestExecutor extends TestExecutor {
+
+ private IParserFactory sqlppParserFactory = new SqlppParserFactory();
+ private IRewriterFactory sqlppRewriterFactory = new SqlppRewriterFactory();
+
+ @Override
+ public void executeTest(String actualPath, TestCaseContext testCaseCtx, ProcessBuilder pb,
+ boolean isDmlRecoveryTest, TestGroup failedGroup) throws Exception {
+ int queryCount = 0;
+ List<CompilationUnit> cUnits = testCaseCtx.getTestCase().getCompilationUnit();
+ for (CompilationUnit cUnit : cUnits) {
+ LOGGER.info(
+ "Starting [TEST]: " + testCaseCtx.getTestCase().getFilePath() + "/" + cUnit.getName() + " ... ");
+ List<TestFileContext> testFileCtxs = testCaseCtx.getTestFiles(cUnit);
+ List<TestFileContext> expectedResultFileCtxs = testCaseCtx.getExpectedResultFiles(cUnit);
+ for (TestFileContext ctx : testFileCtxs) {
+ File testFile = ctx.getFile();
+ try {
+ if (queryCount >= expectedResultFileCtxs.size()) {
+ throw new IllegalStateException("no result file for " + testFile.toString() + "; queryCount: "
+ + queryCount + ", filectxs.size: " + expectedResultFileCtxs.size());
+ }
+
+ // Runs the test query.
+ File actualResultFile = testCaseCtx.getActualResultFile(cUnit, new File(actualPath));
+ File expectedResultFile = expectedResultFileCtxs.get(queryCount).getFile();
+ testSQLPPParser(testFile, actualResultFile, expectedResultFile);
+
+ LOGGER.info(
+ "[TEST]: " + testCaseCtx.getTestCase().getFilePath() + "/" + cUnit.getName() + " PASSED ");
+ queryCount++;
+ } catch (Exception e) {
+ System.err.println("testFile " + testFile.toString() + " raised an exception:");
+ e.printStackTrace();
+ if (cUnit.getExpectedError().isEmpty()) {
+ System.err.println("...Unexpected!");
+ if (failedGroup != null) {
+ failedGroup.getTestCase().add(testCaseCtx.getTestCase());
+ }
+ throw new Exception("Test \"" + testFile + "\" FAILED!", e);
+ } else {
+ LOGGER.info("[TEST]: " + testCaseCtx.getTestCase().getFilePath() + "/" + cUnit.getName()
+ + " failed as expected: " + e.getMessage());
+ System.err.println("...but that was expected.");
+ }
+ }
+ }
+ }
+
+ }
+
+ // Tests the SQL++ parser.
+ public void testSQLPPParser(File queryFile, File actualResultFile, File expectedFile) throws Exception {
+ actualResultFile.getParentFile().mkdirs();
+ PrintWriter writer = new PrintWriter(new FileOutputStream(actualResultFile));
+ IParser parser = sqlppParserFactory.createParser(readTestFile(queryFile));
+ GlobalConfig.ASTERIX_LOGGER.info(queryFile.toString());
+ try {
+ List<Statement> statements = parser.parse();
+ List<FunctionDecl> functions = getDeclaredFunctions(statements);
+ String dvName = getDefaultDataverse(statements);
+ AqlMetadataProvider aqlMetadataProvider = mock(AqlMetadataProvider.class);
+
+ @SuppressWarnings("unchecked")
+ Map<String, String> config = mock(Map.class);
+ when(aqlMetadataProvider.getDefaultDataverseName()).thenReturn(dvName);
+ when(aqlMetadataProvider.getConfig()).thenReturn(config);
+ when(config.get(FunctionUtil.IMPORT_PRIVATE_FUNCTIONS)).thenReturn("true");
+
+ for (Statement st : statements) {
+ if (st.getKind() == Kind.QUERY) {
+ Query query = (Query) st;
+ IQueryRewriter rewriter = sqlppRewriterFactory.createQueryRewriter();
+ rewrite(rewriter, functions, query, aqlMetadataProvider,
+ new LangRewritingContext(query.getVarCounter()));
+ }
+ SqlppAstPrintUtil.print(st, writer);
+ }
+ writer.close();
+ // Compares the actual result and the expected result.
+ runScriptAndCompareWithResult(queryFile, new PrintWriter(System.err), expectedFile, actualResultFile);
+ } catch (Exception e) {
+ GlobalConfig.ASTERIX_LOGGER.warning("Failed while testing file " + queryFile);
+ throw e;
+ } finally {
+ writer.close();
+ }
+ }
+
+ // Extracts declared functions.
+ private List<FunctionDecl> getDeclaredFunctions(List<Statement> statements) {
+ List<FunctionDecl> functionDecls = new ArrayList<FunctionDecl>();
+ for (Statement st : statements) {
+ if (st.getKind().equals(Statement.Kind.FUNCTION_DECL)) {
+ functionDecls.add((FunctionDecl) st);
+ }
+ }
+ return functionDecls;
+ }
+
+ // Gets the default dataverse for the input statements.
+ private String getDefaultDataverse(List<Statement> statements) {
+ for (Statement st : statements) {
+ if (st.getKind().equals(Statement.Kind.DATAVERSE_DECL)) {
+ DataverseDecl dv = (DataverseDecl) st;
+ return dv.getDataverseName().getValue();
+ }
+ }
+ return null;
+ }
+
+ // Rewrite queries.
+ // Note: we do not do inline function rewriting here because this needs real
+ // metadata access.
+ private void rewrite(IQueryRewriter rewriter, List<FunctionDecl> declaredFunctions, Query topExpr,
+ AqlMetadataProvider metadataProvider, LangRewritingContext context) throws AsterixException {
+ PA.invokeMethod(rewriter,
+ "setup(java.util.List, org.apache.asterix.lang.common.statement.Query, org.apache.asterix.metadata.declared.AqlMetadataProvider, "
+ + "org.apache.asterix.lang.common.rewrites.LangRewritingContext)",
+ declaredFunctions, topExpr, metadataProvider, context);
+ PA.invokeMethod(rewriter, "inlineColumnAlias()");
++ PA.invokeMethod(rewriter, "rewriteGlobalAggregations()");
+ PA.invokeMethod(rewriter, "rewriteGroupBys()");
+ PA.invokeMethod(rewriter, "variableCheckAndRewrite(boolean)", Boolean.TRUE);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-app/src/test/resources/optimizerts/queries_sqlpp/count-tweets.sqlpp
----------------------------------------------------------------------
diff --cc asterixdb/asterix-app/src/test/resources/optimizerts/queries_sqlpp/count-tweets.sqlpp
index a2ddf4b,0000000..0bea252
mode 100644,000000..100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/queries_sqlpp/count-tweets.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries_sqlpp/count-tweets.sqlpp
@@@ -1,43 -1,0 +1,43 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop database twitter if exists;
+create database twitter;
+
+use twitter;
+
+
+create type twitter.Tweet as
+{
+ id : int32,
+ tweetid : int64,
+ loc : point,
+ time : datetime,
+ text : string
+}
+
+create external table TwitterData(Tweet) using localfs(("path"="asterix_nc1://data/twitter/smalltweets.txt"),("format"="adm"));
+
+write output to asterix_nc1:"/tmp/count-tweets.adm"
- select element {'word':tok,'count':twitter.count(token)}
++select element {'word':tok,'count':count(token)}
+from TwitterData as t,
+ tokens as token
+with tokens as twitter."word-tokens"(t.text)
+group by token as tok
+;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-app/src/test/resources/optimizerts/queries_sqlpp/denorm-cust-order.sqlpp
----------------------------------------------------------------------
diff --cc asterixdb/asterix-app/src/test/resources/optimizerts/queries_sqlpp/denorm-cust-order.sqlpp
index b5fc4ea,0000000..d8e2b7a
mode 100644,000000..100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/queries_sqlpp/denorm-cust-order.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries_sqlpp/denorm-cust-order.sqlpp
@@@ -1,70 -1,0 +1,70 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop database custorder if exists;
+create database custorder;
+
+use custorder;
+
+
+create type custorder.AddressType as
+ closed {
+ number : int32,
+ street : string,
+ city : string
+}
+
+create type custorder.CustomerType as
+ closed {
+ cid : int32,
+ name : string,
+ age : int32?,
+ address : AddressType?,
+ lastorder : {
+ oid : int32,
+ total : float
+ }
+
+}
+
+create type custorder.OrderType as
+ closed {
+ oid : int32,
+ cid : int32,
+ orderstatus : string,
+ orderpriority : string,
+ clerk : string,
+ total : float
+}
+
- create nodegroup group1 if not exists on
++create nodegroup group1 if not exists on
+ asterix_nc1,
+ asterix_nc2
+;
+create table Customers(CustomerType) primary key cid on group1;
+
+create table Orders(OrderType) primary key oid on group1;
+
+write output to asterix_nc1:"/tmp/custorder.adm"
- select element {'cid':cid,'cust':cust,'cnt-orders':custorder.count(o),'orders':o}
++select element {'cid':cid,'cust':cust,'cnt-orders':count(o),'orders':o}
+from Customers as c,
+ Orders as o
+where (c.cid = o.cid)
+group by c.cid as cid
+;