You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mh...@apache.org on 2016/01/01 02:17:04 UTC
[3/3] incubator-asterixdb git commit: Divide Cluster into Unique
Partitions
Divide Cluster into Unique Partitions
The change includes the following:
- Fix passing NC stores to AsterixConfiguration.
- Unify storage direcotry name in the instance level rather than the node level.
- Divide the cluster into unique storage partitions based on the number of stores.
- Refactored FileSplits and moved out of AqlMetadataProvider.
- Make AsterixHyracksIntegrationUtil use the passed configuration file.
- Make File Splits pass relative index paths of partitions rather than absolute paths.
- Remove unused AqlCompiledMetadataDeclarations class.
Change-Id: I8c7fbca5113dd7ad569a46dfa2591addb5bf8655
Reviewed-on: https://asterix-gerrit.ics.uci.edu/564
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <bu...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/commit/1d5cf640
Tree: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/tree/1d5cf640
Diff: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/diff/1d5cf640
Branch: refs/heads/master
Commit: 1d5cf6403c5c8a2a0b5bf4c8f64b34ecbf1ccc30
Parents: e3e1373
Author: Murtadha Hubail <mh...@uci.edu>
Authored: Thu Dec 31 08:38:15 2015 -0800
Committer: Murtadha Hubail <hu...@gmail.com>
Committed: Thu Dec 31 17:12:53 2015 -0800
----------------------------------------------------------------------
.../api/common/AsterixAppRuntimeContext.java | 6 +-
.../common/AsterixHyracksIntegrationUtil.java | 67 +++--
.../asterix/aql/translator/QueryTranslator.java | 1 -
.../bootstrap/NCApplicationEntryPoint.java | 39 +--
.../resources/asterix-build-configuration.xml | 4 +-
.../asterix/test/metadata/MetadataTest.java | 5 -
.../asterix/test/runtime/ExecutionTest.java | 4 -
.../test/runtime/SqlppExecutionTest.java | 5 +-
.../common/cluster/ClusterPartition.java | 77 ++++++
.../config/AsterixMetadataProperties.java | 15 +-
.../config/AsterixPropertiesAccessor.java | 38 ++-
.../common/context/DatasetLifecycleManager.java | 42 +--
...erixLSMInsertDeleteOperatorNodePushable.java | 2 +-
.../src/main/resources/schema/cluster.xsd | 1 -
.../src/main/resources/schema/yarn_cluster.xsd | 3 -
.../apache/asterix/test/aql/TestExecutor.java | 52 ++--
.../asterix/event/driver/EventDriver.java | 2 +-
.../asterix/event/management/EventUtil.java | 2 +-
.../event/service/AsterixEventServiceUtil.java | 85 +++---
.../asterix/event/util/PatternCreator.java | 44 ++-
.../adapter/factory/HDFSAdapterFactory.java | 111 ++++----
.../factory/HDFSIndexingAdapterFactory.java | 6 +-
...alDatasetIndexesAbortOperatorDescriptor.java | 5 +-
...lDatasetIndexesCommitOperatorDescriptor.java | 10 +-
...DatasetIndexesRecoverOperatorDescriptor.java | 4 +-
.../installer/command/ValidateCommand.java | 22 +-
.../asterix/installer/driver/InstallerUtil.java | 13 +-
.../metadata/bootstrap/MetadataBootstrap.java | 34 ++-
.../AqlCompiledMetadataDeclarations.java | 276 -------------------
.../declared/AqlLogicalPlanAndMetadataImpl.java | 16 +-
.../metadata/declared/AqlMetadataProvider.java | 164 +----------
.../utils/SplitsAndConstraintsUtil.java | 173 ++++++++++++
.../om/util/AsterixClusterProperties.java | 112 ++++++--
...dexModificationOperationCallbackFactory.java | 4 +-
...dexModificationOperationCallbackFactory.java | 4 +-
...dexModificationOperationCallbackFactory.java | 4 +-
...dexModificationOperationCallbackFactory.java | 4 +-
.../PersistentLocalResourceFactory.java | 4 +-
.../PersistentLocalResourceRepository.java | 108 ++++----
.../service/recovery/RecoveryManager.java | 38 ++-
.../asterix/aoya/AsterixApplicationMaster.java | 25 +-
.../apache/asterix/aoya/AsterixYARNClient.java | 154 ++++++-----
42 files changed, 844 insertions(+), 941 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
index 45c0598..4a8a323 100644
--- a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
+++ b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
@@ -187,7 +187,8 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst
String nodeId = ncApplicationContext.getNodeId();
replicaResourcesManager = new ReplicaResourcesManager(ioManager.getIODevices(),
- metadataProperties.getStores().get(nodeId)[0], nodeId, replicationProperties.getReplicationStore());
+ AsterixClusterProperties.INSTANCE.getStorageDirectoryName(), nodeId,
+ replicationProperties.getReplicationStore());
replicationManager = new ReplicationManager(nodeId, replicationProperties, replicaResourcesManager,
txnSubsystem.getLogManager(), asterixAppRuntimeContextProvider);
@@ -377,7 +378,6 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst
@Override
public void initializeResourceIdFactory() throws HyracksDataException {
- resourceIdFactory = new GlobalResourceIdFactoryProvider(ncApplicationContext)
- .createResourceIdFactory();
+ resourceIdFactory = new GlobalResourceIdFactoryProvider(ncApplicationContext).createResourceIdFactory();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index 0145651..d7842e8 100644
--- a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++ b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -19,11 +19,10 @@
package org.apache.asterix.api.common;
import java.io.File;
-import java.io.IOException;
import java.util.EnumSet;
+import java.util.Set;
import org.apache.asterix.common.config.AsterixPropertiesAccessor;
-import org.apache.asterix.common.config.AsterixTransactionProperties;
import org.apache.asterix.common.config.GlobalConfig;
import org.apache.asterix.hyracks.bootstrap.CCApplicationEntryPoint;
import org.apache.asterix.hyracks.bootstrap.NCApplicationEntryPoint;
@@ -41,22 +40,18 @@ import org.apache.hyracks.control.nc.NodeControllerService;
public class AsterixHyracksIntegrationUtil {
private static final String IO_DIR_KEY = "java.io.tmpdir";
- public static final int NODES = 2;
- public static final int PARTITONS = 2;
-
public static final int DEFAULT_HYRACKS_CC_CLIENT_PORT = 1098;
-
public static final int DEFAULT_HYRACKS_CC_CLUSTER_PORT = 1099;
public static ClusterControllerService cc;
- public static NodeControllerService[] ncs = new NodeControllerService[NODES];
+ public static NodeControllerService[] ncs;
public static IHyracksClientConnection hcc;
- protected static AsterixTransactionProperties txnProperties;
+ private static AsterixPropertiesAccessor propertiesAccessor;
public static void init(boolean deleteOldInstanceData) throws Exception {
- AsterixPropertiesAccessor apa = new AsterixPropertiesAccessor();
- txnProperties = new AsterixTransactionProperties(apa);
+ propertiesAccessor = new AsterixPropertiesAccessor();
+ ncs = new NodeControllerService[propertiesAccessor.getNodeNames().size()];
if (deleteOldInstanceData) {
deleteTransactionLogs();
removeTestStorageFiles();
@@ -77,7 +72,8 @@ public class AsterixHyracksIntegrationUtil {
// Starts ncs.
int n = 0;
- for (String ncName : getNcNames()) {
+ Set<String> nodes = propertiesAccessor.getNodeNames();
+ for (String ncName : nodes) {
NCConfig ncConfig1 = new NCConfig();
ncConfig1.ccHost = "localhost";
ncConfig1.ccPort = DEFAULT_HYRACKS_CC_CLUSTER_PORT;
@@ -87,13 +83,28 @@ public class AsterixHyracksIntegrationUtil {
ncConfig1.nodeId = ncName;
ncConfig1.resultTTL = 30000;
ncConfig1.resultSweepThreshold = 1000;
- for (int p = 0; p < PARTITONS; ++p) {
+ String tempPath = System.getProperty(IO_DIR_KEY);
+ if (tempPath.endsWith(File.separator)) {
+ tempPath = tempPath.substring(0, tempPath.length() - 1);
+ }
+ //get initial partitions from properties
+ String[] nodeStores = propertiesAccessor.getStores().get(ncName);
+ if (nodeStores == null) {
+ throw new Exception("Coudn't find stores for NC: " + ncName);
+ }
+ String tempDirPath = System.getProperty(IO_DIR_KEY);
+ if (!tempDirPath.endsWith(File.separator)) {
+ tempDirPath += File.separator;
+ }
+ for (int p = 0; p < nodeStores.length; p++) {
+ //create IO devices based on stores
+ String iodevicePath = tempDirPath + ncConfig1.nodeId + File.separator + nodeStores[p];
+ File ioDeviceDir = new File(iodevicePath);
+ ioDeviceDir.mkdirs();
if (p == 0) {
- ncConfig1.ioDevices = System.getProperty("java.io.tmpdir") + File.separator + ncConfig1.nodeId
- + "/iodevice" + p;
+ ncConfig1.ioDevices = iodevicePath;
} else {
- ncConfig1.ioDevices += "," + System.getProperty("java.io.tmpdir") + File.separator
- + ncConfig1.nodeId + "/iodevice" + p;
+ ncConfig1.ioDevices += "," + iodevicePath;
}
}
ncConfig1.appNCMainClass = NCApplicationEntryPoint.class.getName();
@@ -105,19 +116,7 @@ public class AsterixHyracksIntegrationUtil {
}
public static String[] getNcNames() {
- String[] names = new String[NODES];
- for (int n = 0; n < NODES; ++n) {
- names[n] = "asterix_nc" + (n + 1);
- }
- return names;
- }
-
- public static String[] getDataDirs() {
- String[] names = new String[NODES];
- for (int n = 0; n < NODES; ++n) {
- names[n] = "asterix_nc" + (n + 1) + "data";
- }
- return names;
+ return propertiesAccessor.getNodeNames().toArray(new String[propertiesAccessor.getNodeNames().size()]);
}
public static IHyracksClientConnection getHyracksClientConnection() {
@@ -147,17 +146,17 @@ public class AsterixHyracksIntegrationUtil {
hcc.waitForCompletion(jobId);
}
- private static void removeTestStorageFiles() throws IOException {
+ public static void removeTestStorageFiles() {
File dir = new File(System.getProperty(IO_DIR_KEY));
- for (String ncName : AsterixHyracksIntegrationUtil.getNcNames()) {
+ for (String ncName : propertiesAccessor.getNodeNames()) {
File ncDir = new File(dir, ncName);
FileUtils.deleteQuietly(ncDir);
}
}
private static void deleteTransactionLogs() throws Exception {
- for (String ncId : AsterixHyracksIntegrationUtil.getNcNames()) {
- File log = new File(txnProperties.getLogDirectory(ncId));
+ for (String ncId : propertiesAccessor.getNodeNames()) {
+ File log = new File(propertiesAccessor.getTransactionLogDirs().get(ncId));
if (log.exists()) {
FileUtils.deleteDirectory(log);
}
@@ -185,7 +184,7 @@ public class AsterixHyracksIntegrationUtil {
try {
System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, "asterix-build-configuration.xml");
- init(false);
+ init(true);
while (true) {
Thread.sleep(10000);
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java b/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
index 714e05c..08b92e7 100644
--- a/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
+++ b/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
@@ -1246,7 +1246,6 @@ public class QueryTranslator extends AbstractLangTranslator {
}
}
jobsToExecute.add(DataverseOperations.createDropDataverseJobSpec(dv, metadataProvider));
-
//#. mark PendingDropOp on the dataverse record by
// first, deleting the dataverse record from the DATAVERSE_DATASET
// second, inserting the dataverse record with the PendingDropOp value into the DATAVERSE_DATASET
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
index 147a356..496c2f8 100644
--- a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
@@ -46,11 +46,11 @@ import org.apache.asterix.metadata.MetadataNode;
import org.apache.asterix.metadata.api.IAsterixStateProxy;
import org.apache.asterix.metadata.api.IMetadataNode;
import org.apache.asterix.metadata.bootstrap.MetadataBootstrap;
-import org.apache.asterix.metadata.declared.AqlMetadataProvider;
+import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil;
import org.apache.asterix.om.util.AsterixClusterProperties;
-import org.apache.asterix.replication.storage.AsterixFilesUtil;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.asterix.transaction.management.service.recovery.RecoveryManager;
+import org.apache.commons.io.FileUtils;
import org.apache.hyracks.api.application.INCApplicationContext;
import org.apache.hyracks.api.application.INCApplicationEntryPoint;
import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
@@ -202,7 +202,6 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
AsterixMetadataProperties metadataProperties = ((IAsterixPropertiesProvider) runtimeContext)
.getMetadataProperties();
-
if (initialRun || systemState == SystemState.NEW_UNIVERSE) {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("System state: " + SystemState.NEW_UNIVERSE);
@@ -213,7 +212,7 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
PersistentLocalResourceRepository localResourceRepository = (PersistentLocalResourceRepository) runtimeContext
.getLocalResourceRepository();
- localResourceRepository.initializeNewUniverse(metadataProperties.getStores().get(nodeId)[0]);
+ localResourceRepository.initializeNewUniverse(AsterixClusterProperties.INSTANCE.getStorageDirectoryName());
}
IAsterixStateProxy proxy = null;
@@ -277,22 +276,18 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
performLocalCleanUp();
}
- private void performLocalCleanUp() throws IOException {
+ private void performLocalCleanUp() {
//delete working area files from failed jobs
runtimeContext.getIOManager().deleteWorkspaceFiles();
//reclaim storage for temporary datasets.
- PersistentLocalResourceRepository localResourceRepository = (PersistentLocalResourceRepository) runtimeContext
- .getLocalResourceRepository();
-
- String[] storageMountingPoints = localResourceRepository.getStorageMountingPoints();
- String storageFolderName = ((IAsterixPropertiesProvider) runtimeContext).getMetadataProperties().getStores()
- .get(nodeId)[0];
-
- for (String mountPoint : storageMountingPoints) {
- String tempDatasetFolder = mountPoint + storageFolderName + File.separator
- + AqlMetadataProvider.TEMP_DATASETS_STORAGE_FOLDER;
- AsterixFilesUtil.deleteFolder(tempDatasetFolder);
+ //get node stores
+ String[] nodeStores = ((IAsterixPropertiesProvider) runtimeContext).getMetadataProperties().getStores()
+ .get(nodeId);
+ for (String store : nodeStores) {
+ String tempDatasetFolder = store + File.separator
+ + SplitsAndConstraintsUtil.TEMP_DATASETS_STORAGE_FOLDER;
+ FileUtils.deleteQuietly(new File(tempDatasetFolder));
}
// TODO
@@ -309,7 +304,7 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
if (cluster == null) {
throw new IllegalStateException("No cluster configuration found for this instance");
}
- String asterixInstanceName = cluster.getInstanceName();
+ String asterixInstanceName = metadataProperties.getInstanceName();
AsterixTransactionProperties txnProperties = ((IAsterixPropertiesProvider) runtimeContext)
.getTransactionProperties();
Node self = null;
@@ -322,8 +317,14 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
for (Node node : nodes) {
String ncId = asterixInstanceName + "_" + node.getId();
if (ncId.equalsIgnoreCase(nodeId)) {
- String storeDir = node.getStore() == null ? cluster.getStore() : node.getStore();
- metadataProperties.getStores().put(nodeId, storeDir.split(","));
+ String storeDir = AsterixClusterProperties.INSTANCE.getStorageDirectoryName();
+ String nodeIoDevices = node.getIodevices() == null ? cluster.getIodevices() : node.getIodevices();
+ String[] ioDevicePaths = nodeIoDevices.trim().split(",");
+ for (int i = 0; i < ioDevicePaths.length; i++) {
+ //construct full store path
+ ioDevicePaths[i] += File.separator + storeDir;
+ }
+ metadataProperties.getStores().put(nodeId, ioDevicePaths);
String coredumpPath = node.getLogDir() == null ? cluster.getLogDir() : node.getLogDir();
metadataProperties.getCoredumpPaths().put(nodeId, coredumpPath);
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-app/src/main/resources/asterix-build-configuration.xml
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/resources/asterix-build-configuration.xml b/asterix-app/src/main/resources/asterix-build-configuration.xml
index fa20099..731113b 100644
--- a/asterix-app/src/main/resources/asterix-build-configuration.xml
+++ b/asterix-app/src/main/resources/asterix-build-configuration.xml
@@ -20,11 +20,11 @@
<metadataNode>asterix_nc1</metadataNode>
<store>
<ncId>asterix_nc1</ncId>
- <storeDirs>asterix_nc1data</storeDirs>
+ <storeDirs>iodevice0,iodevice1</storeDirs>
</store>
<store>
<ncId>asterix_nc2</ncId>
- <storeDirs>asterix_nc2data</storeDirs>
+ <storeDirs>iodevice0,iodevice1</storeDirs>
</store>
<transactionLogDir>
<ncId>asterix_nc1</ncId>
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTest.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTest.java b/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTest.java
index 376b2ff..6c6e411 100644
--- a/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTest.java
+++ b/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTest.java
@@ -65,11 +65,6 @@ public class MetadataTest {
if (files == null || files.length == 0) {
outdir.delete();
}
-
- // clean up the files written by the ASTERIX storage manager
- for (String d : AsterixHyracksIntegrationUtil.getDataDirs()) {
- testExecutor.deleteRec(new File(d));
- }
}
@Parameters
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTest.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTest.java b/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTest.java
index 052e0be..7a55c90 100644
--- a/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTest.java
+++ b/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTest.java
@@ -68,10 +68,6 @@ public class ExecutionTest {
@AfterClass
public static void tearDown() throws Exception {
ExecutionTestUtil.tearDown();
- // clean up the files written by the ASTERIX storage manager
- for (String d : AsterixHyracksIntegrationUtil.getDataDirs()) {
- testExecutor.deleteRec(new File(d));
- }
}
@Parameters
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionTest.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionTest.java b/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionTest.java
index d5f4db3..22a3ad7 100644
--- a/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionTest.java
+++ b/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionTest.java
@@ -65,10 +65,7 @@ public class SqlppExecutionTest {
@AfterClass
public static void tearDown() throws Exception {
ExecutionTestUtil.tearDown();
- // clean up the files written by the ASTERIX storage manager
- for (String d : AsterixHyracksIntegrationUtil.getDataDirs()) {
- testExecutor.deleteRec(new File(d));
- }
+ AsterixHyracksIntegrationUtil.removeTestStorageFiles();
}
@Parameters
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java b/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
new file mode 100644
index 0000000..6cd44a7
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.cluster;
+
+public class ClusterPartition implements Cloneable {
+ private final int partitionId;
+ private final String nodeId;
+ private final int ioDeviceNum;
+ private String activeNodeId = null;
+ private boolean active = false;
+
+ public ClusterPartition(int partitionId, String nodeId, int ioDeviceNum) {
+ this.partitionId = partitionId;
+ this.nodeId = nodeId;
+ this.ioDeviceNum = ioDeviceNum;
+ }
+
+ public int getPartitionId() {
+ return partitionId;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public int getIODeviceNum() {
+ return ioDeviceNum;
+ }
+
+ public String getActiveNodeId() {
+ return activeNodeId;
+ }
+
+ public void setActiveNodeId(String activeNodeId) {
+ this.activeNodeId = activeNodeId;
+ }
+
+ public void setActive(boolean active) {
+ this.active = active;
+ }
+
+ @Override
+ public ClusterPartition clone() {
+ ClusterPartition clone = new ClusterPartition(partitionId, nodeId, ioDeviceNum);
+ return clone;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("ID:" + partitionId);
+ sb.append(" Original Node: " + nodeId);
+ sb.append(" IODevice: " + ioDeviceNum);
+ sb.append(" Active Node: " + activeNodeId);
+ return sb.toString();
+ }
+
+ public boolean isActive() {
+ return active;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
index adaca46..8e2c4e7 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
@@ -20,6 +20,9 @@ package org.apache.asterix.common.config;
import java.util.Map;
import java.util.Set;
+import java.util.SortedMap;
+
+import org.apache.asterix.common.cluster.ClusterPartition;
public class AsterixMetadataProperties extends AbstractAsterixProperties {
@@ -35,8 +38,8 @@ public class AsterixMetadataProperties extends AbstractAsterixProperties {
return accessor.getMetadataNodeName();
}
- public String getMetadataStore() {
- return accessor.getMetadataStore();
+ public ClusterPartition getMetadataPartition() {
+ return accessor.getMetadataPartiton();
}
public Map<String, String[]> getStores() {
@@ -54,4 +57,12 @@ public class AsterixMetadataProperties extends AbstractAsterixProperties {
public Map<String, String> getCoredumpPaths() {
return accessor.getCoredumpConfig();
}
+
+ public Map<String, ClusterPartition[]> getNodePartitions() {
+ return accessor.getNodePartitions();
+ }
+
+ public SortedMap<Integer, ClusterPartition> getClusterPartitions() {
+ return accessor.getClusterPartitions();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
index d6c81ab..cc7ec84 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
@@ -28,6 +28,8 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -35,6 +37,7 @@ import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Unmarshaller;
+import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.configuration.AsterixConfiguration;
import org.apache.asterix.common.configuration.Coredump;
import org.apache.asterix.common.configuration.Property;
@@ -53,12 +56,15 @@ public class AsterixPropertiesAccessor {
private final Map<String, Property> asterixConfigurationParams;
private final Map<String, String> transactionLogDirs;
private final Map<String, String> asterixBuildProperties;
+ private final Map<String, ClusterPartition[]> nodePartitionsMap;
+ private SortedMap<Integer, ClusterPartition> clusterPartitions;
public AsterixPropertiesAccessor() throws AsterixException {
String fileName = System.getProperty(GlobalConfig.CONFIG_FILE_PROPERTY);
if (fileName == null) {
fileName = GlobalConfig.DEFAULT_CONFIG_FILE_NAME;
}
+
InputStream is = this.getClass().getClassLoader().getResourceAsStream(fileName);
if (is == null) {
try {
@@ -82,9 +88,20 @@ public class AsterixPropertiesAccessor {
stores = new HashMap<String, String[]>();
List<Store> configuredStores = asterixConfiguration.getStore();
nodeNames = new HashSet<String>();
+ nodePartitionsMap = new HashMap<>();
+ clusterPartitions = new TreeMap<>();
+ int uniquePartitionId = 0;
for (Store store : configuredStores) {
String trimmedStoreDirs = store.getStoreDirs().trim();
- stores.put(store.getNcId(), trimmedStoreDirs.split(","));
+ String[] nodeStores = trimmedStoreDirs.split(",");
+ ClusterPartition[] nodePartitions = new ClusterPartition[nodeStores.length];
+ for (int i = 0; i < nodePartitions.length; i++) {
+ ClusterPartition partition = new ClusterPartition(uniquePartitionId++, store.getNcId(), i);
+ clusterPartitions.put(partition.getPartitionId(), partition);
+ nodePartitions[i] = partition;
+ }
+ stores.put(store.getNcId(), nodeStores);
+ nodePartitionsMap.put(store.getNcId(), nodePartitions);
nodeNames.add(store.getNcId());
}
asterixConfigurationParams = new HashMap<String, Property>();
@@ -116,10 +133,6 @@ public class AsterixPropertiesAccessor {
return metadataNodeName;
}
- public String getMetadataStore() {
- return stores.get(metadataNodeName)[0];
- }
-
public Map<String, String[]> getStores() {
return stores;
}
@@ -172,7 +185,7 @@ public class AsterixPropertiesAccessor {
}
}
- private <T> void logConfigurationError(Property p, T defaultValue) {
+ private static <T> void logConfigurationError(Property p, T defaultValue) {
if (LOGGER.isLoggable(Level.SEVERE)) {
LOGGER.severe("Invalid property value '" + p.getValue() + "' for property '" + p.getName()
+ "'.\n See the description: \n" + p.getDescription() + "\nDefault = " + defaultValue);
@@ -182,4 +195,17 @@ public class AsterixPropertiesAccessor {
public String getInstanceName() {
return instanceName;
}
+
+ public ClusterPartition getMetadataPartiton() {
+ //metadata partition is always the first partition on the metadata node
+ return nodePartitionsMap.get(metadataNodeName)[0];
+ }
+
+ public Map<String, ClusterPartition[]> getNodePartitions() {
+ return nodePartitionsMap;
+ }
+
+ public SortedMap<Integer, ClusterPartition> getClusterPartitions() {
+ return clusterPartitions;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index adf1152..5062d06 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -78,9 +78,9 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
}
@Override
- public synchronized IIndex getIndex(String resourceName) throws HyracksDataException {
- int datasetID = getDIDfromResourceName(resourceName);
- long resourceID = getResourceIDfromResourceName(resourceName);
+ public synchronized IIndex getIndex(String resourcePath) throws HyracksDataException {
+ int datasetID = getDIDfromResourcePath(resourcePath);
+ long resourceID = getResourceIDfromResourcePath(resourcePath);
return getIndex(datasetID, resourceID);
}
@@ -98,9 +98,9 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
}
@Override
- public synchronized void register(String resourceName, IIndex index) throws HyracksDataException {
- int did = getDIDfromResourceName(resourceName);
- long resourceID = getResourceIDfromResourceName(resourceName);
+ public synchronized void register(String resourcePath, IIndex index) throws HyracksDataException {
+ int did = getDIDfromResourcePath(resourcePath);
+ long resourceID = getResourceIDfromResourcePath(resourcePath);
DatasetInfo dsInfo = datasetInfos.get(did);
if (dsInfo == null) {
dsInfo = getDatasetInfo(did);
@@ -116,16 +116,16 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
dsInfo.indexes.put(resourceID, new IndexInfo((ILSMIndex) index, dsInfo.datasetID, resourceID));
}
- public int getDIDfromResourceName(String resourceName) throws HyracksDataException {
- LocalResource lr = resourceRepository.getResourceByName(resourceName);
+ public int getDIDfromResourcePath(String resourcePath) throws HyracksDataException {
+ LocalResource lr = resourceRepository.getResourceByPath(resourcePath);
if (lr == null) {
return -1;
}
return ((ILocalResourceMetadata) lr.getResourceObject()).getDatasetID();
}
- public long getResourceIDfromResourceName(String resourceName) throws HyracksDataException {
- LocalResource lr = resourceRepository.getResourceByName(resourceName);
+ public long getResourceIDfromResourcePath(String resourcePath) throws HyracksDataException {
+ LocalResource lr = resourceRepository.getResourceByPath(resourcePath);
if (lr == null) {
return -1;
}
@@ -133,9 +133,9 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
}
@Override
- public synchronized void unregister(String resourceName) throws HyracksDataException {
- int did = getDIDfromResourceName(resourceName);
- long resourceID = getResourceIDfromResourceName(resourceName);
+ public synchronized void unregister(String resourcePath) throws HyracksDataException {
+ int did = getDIDfromResourcePath(resourcePath);
+ long resourceID = getResourceIDfromResourcePath(resourcePath);
DatasetInfo dsInfo = datasetInfos.get(did);
IndexInfo iInfo = dsInfo.indexes.get(resourceID);
@@ -180,9 +180,9 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
}
@Override
- public synchronized void open(String resourceName) throws HyracksDataException {
- int did = getDIDfromResourceName(resourceName);
- long resourceID = getResourceIDfromResourceName(resourceName);
+ public synchronized void open(String resourcePath) throws HyracksDataException {
+ int did = getDIDfromResourcePath(resourcePath);
+ long resourceID = getResourceIDfromResourcePath(resourcePath);
DatasetInfo dsInfo = datasetInfos.get(did);
if (dsInfo == null || !dsInfo.isRegistered) {
@@ -262,9 +262,9 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
}
@Override
- public synchronized void close(String resourceName) throws HyracksDataException {
- int did = getDIDfromResourceName(resourceName);
- long resourceID = getResourceIDfromResourceName(resourceName);
+ public synchronized void close(String resourcePath) throws HyracksDataException {
+ int did = getDIDfromResourcePath(resourcePath);
+ long resourceID = getResourceIDfromResourcePath(resourcePath);
DatasetInfo dsInfo = datasetInfos.get(did);
if (dsInfo == null) {
@@ -704,9 +704,9 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
}
@Override
- public synchronized void allocateMemory(String resourceName) throws HyracksDataException {
+ public synchronized void allocateMemory(String resourcePath) throws HyracksDataException {
//a resource name in the case of DatasetLifecycleManager is a dataset id which is passed to the ResourceHeapBufferAllocator.
- int did = Integer.parseInt(resourceName);
+ int did = Integer.parseInt(resourcePath);
allocateDatasetMemory(did);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java b/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
index 51168ae..fd1ebb8 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
@@ -63,7 +63,7 @@ public class AsterixLSMInsertDeleteOperatorNodePushable extends LSMIndexInsertUp
try {
writer.open();
modCallback = opDesc.getModificationOpCallbackFactory().createModificationOperationCallback(
- indexHelper.getResourceName(), indexHelper.getResourceID(), lsmIndex, ctx);
+ indexHelper.getResourcePath(), indexHelper.getResourceID(), lsmIndex, ctx);
indexAccessor = lsmIndex.createAccessor(modCallback, NoOpOperationCallback.INSTANCE);
ITupleFilterFactory tupleFilterFactory = opDesc.getTupleFilterFactory();
if (tupleFilterFactory != null) {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-common/src/main/resources/schema/cluster.xsd
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/resources/schema/cluster.xsd b/asterix-common/src/main/resources/schema/cluster.xsd
index d3203d5..872c959 100644
--- a/asterix-common/src/main/resources/schema/cluster.xsd
+++ b/asterix-common/src/main/resources/schema/cluster.xsd
@@ -113,7 +113,6 @@
<xs:element ref="cl:java_home" minOccurs="0" />
<xs:element ref="cl:log_dir" minOccurs="0" />
<xs:element ref="cl:txn_log_dir" minOccurs="0" />
- <xs:element ref="cl:store" minOccurs="0" />
<xs:element ref="cl:iodevices" minOccurs="0" />
<xs:element ref="cl:debug_port" minOccurs="0" />
</xs:sequence>
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-common/src/main/resources/schema/yarn_cluster.xsd
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/resources/schema/yarn_cluster.xsd b/asterix-common/src/main/resources/schema/yarn_cluster.xsd
index f54cf90..8827985 100644
--- a/asterix-common/src/main/resources/schema/yarn_cluster.xsd
+++ b/asterix-common/src/main/resources/schema/yarn_cluster.xsd
@@ -138,9 +138,6 @@
ref="cl:txn_log_dir"
minOccurs="0" />
<xs:element
- ref="cl:store"
- minOccurs="0" />
- <xs:element
ref="cl:iodevices"
minOccurs="0" />
<xs:element
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java b/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
index d04a3dd..d8147b6 100644
--- a/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
+++ b/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
@@ -71,7 +71,7 @@ public class TestExecutor {
this.port = 19002;
}
- public TestExecutor(String host, int port){
+ public TestExecutor(String host, int port) {
this.host = host;
this.port = port;
}
@@ -225,12 +225,16 @@ public class TestExecutor {
// In future this may be changed depending on the requested
// output format sent to the servlet.
String errorBody = method.getResponseBodyAsString();
- JSONObject result = new JSONObject(errorBody);
- String[] errors = { result.getJSONArray("error-code").getString(0), result.getString("summary"),
- result.getString("stacktrace") };
- GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, errors[2]);
- throw new Exception("HTTP operation failed: " + errors[0] + "\nSTATUS LINE: " + method.getStatusLine()
- + "\nSUMMARY: " + errors[1] + "\nSTACKTRACE: " + errors[2]);
+ try {
+ JSONObject result = new JSONObject(errorBody);
+ String[] errors = { result.getJSONArray("error-code").getString(0), result.getString("summary"),
+ result.getString("stacktrace") };
+ GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, errors[2]);
+ throw new Exception("HTTP operation failed: " + errors[0] + "\nSTATUS LINE: " + method.getStatusLine()
+ + "\nSUMMARY: " + errors[1] + "\nSTACKTRACE: " + errors[2]);
+ } catch (Exception e) {
+ throw new Exception(errorBody);
+ }
}
return statusCode;
}
@@ -303,7 +307,7 @@ public class TestExecutor {
}
private InputStream getHandleResult(String handle, OutputFormat fmt) throws Exception {
- final String url = "http://"+host+":"+port+"/query/result";
+ final String url = "http://" + host + ":" + port + "/query/result";
// Create a method instance.
GetMethod method = new GetMethod(url);
@@ -430,9 +434,9 @@ public class TestExecutor {
switch (ctx.getType()) {
case "ddl":
if (ctx.getFile().getName().endsWith("aql")) {
- executeDDL(statement, "http://"+host+":"+port+"/ddl");
+ executeDDL(statement, "http://" + host + ":" + port + "/ddl");
} else {
- executeDDL(statement, "http://"+host+":"+port+"/ddl/sqlpp");
+ executeDDL(statement, "http://" + host + ":" + port + "/ddl/sqlpp");
}
break;
case "update":
@@ -442,9 +446,9 @@ public class TestExecutor {
"127.0.0.1://../../../../../../asterix-app/");
}
if (ctx.getFile().getName().endsWith("aql")) {
- executeUpdate(statement, "http://"+host+":"+port+"/update");
+ executeUpdate(statement, "http://" + host + ":" + port + "/update");
} else {
- executeUpdate(statement, "http://"+host+":"+port+"/update/sqlpp");
+ executeUpdate(statement, "http://" + host + ":" + port + "/update/sqlpp");
}
break;
case "query":
@@ -461,25 +465,25 @@ public class TestExecutor {
OutputFormat fmt = OutputFormat.forCompilationUnit(cUnit);
if (ctx.getFile().getName().endsWith("aql")) {
if (ctx.getType().equalsIgnoreCase("query")) {
- resultStream = executeQuery(statement, fmt, "http://"+host+":"+port+"/query",
- cUnit.getParameter());
+ resultStream = executeQuery(statement, fmt,
+ "http://" + host + ":" + port + "/query", cUnit.getParameter());
} else if (ctx.getType().equalsIgnoreCase("async")) {
resultStream = executeAnyAQLAsync(statement, false, fmt,
- "http://"+host+":"+port+"/aql");
+ "http://" + host + ":" + port + "/aql");
} else if (ctx.getType().equalsIgnoreCase("asyncdefer")) {
resultStream = executeAnyAQLAsync(statement, true, fmt,
- "http://"+host+":"+port+"/aql");
+ "http://" + host + ":" + port + "/aql");
}
} else {
if (ctx.getType().equalsIgnoreCase("query")) {
- resultStream = executeQuery(statement, fmt, "http://"+host+":"+port+"/query/sqlpp",
- cUnit.getParameter());
+ resultStream = executeQuery(statement, fmt,
+ "http://" + host + ":" + port + "/query/sqlpp", cUnit.getParameter());
} else if (ctx.getType().equalsIgnoreCase("async")) {
resultStream = executeAnyAQLAsync(statement, false, fmt,
- "http://"+host+":"+port+"/sqlpp");
+ "http://" + host + ":" + port + "/sqlpp");
} else if (ctx.getType().equalsIgnoreCase("asyncdefer")) {
resultStream = executeAnyAQLAsync(statement, true, fmt,
- "http://"+host+":"+port+"/sqlpp");
+ "http://" + host + ":" + port + "/sqlpp");
}
}
@@ -506,7 +510,7 @@ public class TestExecutor {
break;
case "txnqbc": //qbc represents query before crash
resultStream = executeQuery(statement, OutputFormat.forCompilationUnit(cUnit),
- "http://"+host+":"+port+"/query", cUnit.getParameter());
+ "http://" + host + ":" + port + "/query", cUnit.getParameter());
qbcFile = new File(actualPath + File.separator
+ testCaseCtx.getTestCase().getFilePath().replace(File.separator, "_") + "_"
+ cUnit.getName() + "_qbc.adm");
@@ -515,7 +519,7 @@ public class TestExecutor {
break;
case "txnqar": //qar represents query after recovery
resultStream = executeQuery(statement, OutputFormat.forCompilationUnit(cUnit),
- "http://"+host+":"+port+"/query", cUnit.getParameter());
+ "http://" + host + ":" + port + "/query", cUnit.getParameter());
qarFile = new File(actualPath + File.separator
+ testCaseCtx.getTestCase().getFilePath().replace(File.separator, "_") + "_"
+ cUnit.getName() + "_qar.adm");
@@ -528,7 +532,7 @@ public class TestExecutor {
break;
case "txneu": //eu represents erroneous update
try {
- executeUpdate(statement, "http://"+host+":"+port+"/update");
+ executeUpdate(statement, "http://" + host + ":" + port + "/update");
} catch (Exception e) {
//An exception is expected.
failed = true;
@@ -556,7 +560,7 @@ public class TestExecutor {
break;
case "errddl": // a ddlquery that expects error
try {
- executeDDL(statement, "http://"+host+":"+port+"/ddl");
+ executeDDL(statement, "http://" + host + ":" + port + "/ddl");
} catch (Exception e) {
// expected error happens
failed = true;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java
----------------------------------------------------------------------
diff --git a/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java b/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java
index c92262c..29765fd 100644
--- a/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java
+++ b/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java
@@ -41,7 +41,7 @@ import org.kohsuke.args4j.CmdLineParser;
public class EventDriver {
public static final String CLIENT_NODE_ID = "client_node";
- public static final Node CLIENT_NODE = new Node(CLIENT_NODE_ID, "127.0.0.1", null, null, null, null, null, null);
+ public static final Node CLIENT_NODE = new Node(CLIENT_NODE_ID, "127.0.0.1", null, null, null, null, null);
private static String eventsDir;
private static Events events;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java
----------------------------------------------------------------------
diff --git a/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java b/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java
index d6e7da0..b83faa2 100644
--- a/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java
+++ b/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java
@@ -191,7 +191,7 @@ public class EventUtil {
String javaHome = cluster.getMasterNode().getJavaHome() == null ? cluster.getJavaHome() : cluster
.getMasterNode().getJavaHome();
return new Node(cluster.getMasterNode().getId(), cluster.getMasterNode().getClusterIp(), javaHome, logDir,
- null, null, null, cluster.getMasterNode().getDebugPort());
+ null, null, cluster.getMasterNode().getDebugPort());
}
List<Node> nodeList = cluster.getNode();
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java
----------------------------------------------------------------------
diff --git a/asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java b/asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java
index 33ba787..4bd5098 100644
--- a/asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java
+++ b/asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java
@@ -45,7 +45,6 @@ import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Marshaller;
-import org.apache.commons.io.IOUtils;
import org.apache.asterix.common.configuration.AsterixConfiguration;
import org.apache.asterix.common.configuration.Coredump;
import org.apache.asterix.common.configuration.Store;
@@ -59,6 +58,7 @@ import org.apache.asterix.event.schema.cluster.Cluster;
import org.apache.asterix.event.schema.cluster.Env;
import org.apache.asterix.event.schema.cluster.Node;
import org.apache.asterix.event.schema.cluster.Property;
+import org.apache.commons.io.IOUtils;
public class AsterixEventServiceUtil {
@@ -90,8 +90,8 @@ public class AsterixEventServiceUtil {
return instance;
}
- public static void createAsterixZip(AsterixInstance asterixInstance) throws IOException, InterruptedException,
- JAXBException, EventException {
+ public static void createAsterixZip(AsterixInstance asterixInstance)
+ throws IOException, InterruptedException, JAXBException, EventException {
String asterixInstanceDir = asterixInstanceDir(asterixInstance);
unzip(AsterixEventService.getAsterixZip(), asterixInstanceDir);
@@ -128,18 +128,18 @@ public class AsterixEventServiceUtil {
clusterProperties.add(new Property(EventUtil.CC_JAVA_OPTS, ccJavaOpts));
clusterProperties.add(new Property(EventUtil.NC_JAVA_OPTS, ncJavaOpts));
- clusterProperties.add(new Property("ASTERIX_HOME", cluster.getWorkingDir().getDir() + File.separator
- + "asterix"));
+ clusterProperties
+ .add(new Property("ASTERIX_HOME", cluster.getWorkingDir().getDir() + File.separator + "asterix"));
clusterProperties.add(new Property("LOG_DIR", cluster.getLogDir()));
clusterProperties.add(new Property("JAVA_HOME", cluster.getJavaHome()));
clusterProperties.add(new Property("WORKING_DIR", cluster.getWorkingDir().getDir()));
clusterProperties.add(new Property("CLIENT_NET_IP", cluster.getMasterNode().getClientIp()));
clusterProperties.add(new Property("CLUSTER_NET_IP", cluster.getMasterNode().getClusterIp()));
- int clusterNetPort = cluster.getMasterNode().getClusterPort() != null ? cluster.getMasterNode()
- .getClusterPort().intValue() : CLUSTER_NET_PORT_DEFAULT;
- int clientNetPort = cluster.getMasterNode().getClientPort() != null ? cluster.getMasterNode().getClientPort()
- .intValue() : CLIENT_NET_PORT_DEFAULT;
+ int clusterNetPort = cluster.getMasterNode().getClusterPort() != null
+ ? cluster.getMasterNode().getClusterPort().intValue() : CLUSTER_NET_PORT_DEFAULT;
+ int clientNetPort = cluster.getMasterNode().getClientPort() != null
+ ? cluster.getMasterNode().getClientPort().intValue() : CLIENT_NET_PORT_DEFAULT;
int httpPort = cluster.getMasterNode().getHttpPort() != null ? cluster.getMasterNode().getHttpPort().intValue()
: HTTP_PORT_DEFAULT;
@@ -151,8 +151,8 @@ public class AsterixEventServiceUtil {
}
private static String asterixZipName() {
- return AsterixEventService.getAsterixZip().substring(
- AsterixEventService.getAsterixZip().lastIndexOf(File.separator) + 1);
+ return AsterixEventService.getAsterixZip()
+ .substring(AsterixEventService.getAsterixZip().lastIndexOf(File.separator) + 1);
}
private static String asterixJarPath(AsterixInstance asterixInstance, String asterixInstanceDir) {
@@ -174,8 +174,8 @@ public class AsterixEventServiceUtil {
new File(asterixInstanceDir + File.separator + ASTERIX_CONFIGURATION_FILE).delete();
}
- private static void injectAsterixClusterConfigurationFile(String asterixInstanceDir, AsterixInstance asterixInstance)
- throws IOException, EventException, JAXBException {
+ private static void injectAsterixClusterConfigurationFile(String asterixInstanceDir,
+ AsterixInstance asterixInstance) throws IOException, EventException, JAXBException {
File sourceJar = new File(asterixJarPath(asterixInstance, asterixInstanceDir));
writeAsterixClusterConfigurationFile(asterixInstance);
@@ -185,8 +185,8 @@ public class AsterixEventServiceUtil {
new File(asterixInstanceDir + File.separator + CLUSTER_CONFIGURATION_FILE).delete();
}
- private static void writeAsterixClusterConfigurationFile(AsterixInstance asterixInstance) throws IOException,
- EventException, JAXBException {
+ private static void writeAsterixClusterConfigurationFile(AsterixInstance asterixInstance)
+ throws IOException, EventException, JAXBException {
String asterixInstanceName = asterixInstance.getName();
Cluster cluster = asterixInstance.getCluster();
@@ -197,8 +197,8 @@ public class AsterixEventServiceUtil {
+ asterixInstanceName + File.separator + "cluster.xml"));
}
- public static void addLibraryToAsterixZip(AsterixInstance asterixInstance, String dataverseName,
- String libraryName, String libraryPath) throws IOException {
+ public static void addLibraryToAsterixZip(AsterixInstance asterixInstance, String dataverseName, String libraryName,
+ String libraryPath) throws IOException {
File instanceDir = new File(asterixInstanceDir(asterixInstance));
if (!instanceDir.exists()) {
instanceDir.mkdirs();
@@ -235,30 +235,8 @@ public class AsterixEventServiceUtil {
return metadataNode;
}
- public static String getNodeDirectories(String asterixInstanceName, Node node, Cluster cluster) {
- String storeDataSubDir = asterixInstanceName + File.separator + "data" + File.separator;
- String[] storeDirs = null;
- StringBuffer nodeDataStore = new StringBuffer();
- String storeDirValue = node.getStore();
- if (storeDirValue == null) {
- storeDirValue = cluster.getStore();
- if (storeDirValue == null) {
- throw new IllegalStateException(" Store not defined for node " + node.getId());
- }
- storeDataSubDir = node.getId() + File.separator + storeDataSubDir;
- }
-
- storeDirs = storeDirValue.split(",");
- for (String ns : storeDirs) {
- nodeDataStore.append(ns + File.separator + storeDataSubDir.trim());
- nodeDataStore.append(",");
- }
- nodeDataStore.deleteCharAt(nodeDataStore.length() - 1);
- return nodeDataStore.toString();
- }
-
- private static void writeAsterixConfigurationFile(AsterixInstance asterixInstance) throws IOException,
- JAXBException {
+ private static void writeAsterixConfigurationFile(AsterixInstance asterixInstance)
+ throws IOException, JAXBException {
String asterixInstanceName = asterixInstance.getName();
Cluster cluster = asterixInstance.getCluster();
String metadataNodeId = asterixInstance.getMetadataNodeId();
@@ -266,29 +244,34 @@ public class AsterixEventServiceUtil {
AsterixConfiguration configuration = asterixInstance.getAsterixConfiguration();
configuration.setInstanceName(asterixInstanceName);
configuration.setMetadataNode(asterixInstanceName + "_" + metadataNodeId);
- String storeDir = null;
List<Store> stores = new ArrayList<Store>();
+ String storeDir = cluster.getStore().trim();
for (Node node : cluster.getNode()) {
- storeDir = node.getStore() == null ? cluster.getStore() : node.getStore();
- stores.add(new Store(asterixInstanceName + "_" + node.getId(), storeDir));
+ String iodevices = node.getIodevices() == null ? cluster.getIodevices() : node.getIodevices();
+ String[] nodeIdDevice = iodevices.split(",");
+ StringBuilder nodeStores = new StringBuilder();
+ for (int i = 0; i < nodeIdDevice.length; i++) {
+ nodeStores.append(nodeIdDevice[i] + File.separator + storeDir + ",");
+ }
+ //remove last comma
+ nodeStores.deleteCharAt(nodeStores.length() - 1);
+ stores.add(new Store(asterixInstanceName + "_" + node.getId(), nodeStores.toString()));
}
configuration.setStore(stores);
-
List<Coredump> coredump = new ArrayList<Coredump>();
String coredumpDir = null;
List<TransactionLogDir> txnLogDirs = new ArrayList<TransactionLogDir>();
String txnLogDir = null;
for (Node node : cluster.getNode()) {
coredumpDir = node.getLogDir() == null ? cluster.getLogDir() : node.getLogDir();
- coredump.add(new Coredump(asterixInstanceName + "_" + node.getId(), coredumpDir + File.separator
- + asterixInstanceName + "_" + node.getId()));
+ coredump.add(new Coredump(asterixInstanceName + "_" + node.getId(),
+ coredumpDir + File.separator + asterixInstanceName + "_" + node.getId()));
txnLogDir = node.getTxnLogDir() == null ? cluster.getTxnLogDir() : node.getTxnLogDir();
txnLogDirs.add(new TransactionLogDir(asterixInstanceName + "_" + node.getId(), txnLogDir));
}
configuration.setCoredump(coredump);
configuration.setTransactionLogDir(txnLogDirs);
-
File asterixConfDir = new File(AsterixEventService.getAsterixDir() + File.separator + asterixInstanceName);
asterixConfDir.mkdirs();
@@ -300,8 +283,6 @@ public class AsterixEventServiceUtil {
os.close();
}
-
-
public static void unzip(String sourceFile, String destDir) throws IOException {
BufferedOutputStream dest = null;
FileInputStream fis = new FileInputStream(sourceFile);
@@ -432,8 +413,8 @@ public class AsterixEventServiceUtil {
}
}
if (!valid) {
- throw new EventException("Asterix instance by the name " + name + " is in " + instance.getState()
- + " state ");
+ throw new EventException(
+ "Asterix instance by the name " + name + " is in " + instance.getState() + " state ");
}
return instance;
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-events/src/main/java/org/apache/asterix/event/util/PatternCreator.java
----------------------------------------------------------------------
diff --git a/asterix-events/src/main/java/org/apache/asterix/event/util/PatternCreator.java b/asterix-events/src/main/java/org/apache/asterix/event/util/PatternCreator.java
index b6aaddb..6085019 100644
--- a/asterix-events/src/main/java/org/apache/asterix/event/util/PatternCreator.java
+++ b/asterix-events/src/main/java/org/apache/asterix/event/util/PatternCreator.java
@@ -164,11 +164,11 @@ public class PatternCreator {
String store;
String pargs;
String iodevices;
+ store = cluster.getStore();
List<Pattern> patternList = new ArrayList<Pattern>();
for (Node node : cluster.getNode()) {
Nodeid nodeid = new Nodeid(new Value(null, node.getId()));
iodevices = node.getIodevices() == null ? instance.getCluster().getIodevices() : node.getIodevices();
- store = node.getStore() == null ? cluster.getStore() : node.getStore();
pargs = workingDir + " " + instance.getName() + " " + iodevices + " " + store + " "
+ AsterixConstants.ASTERIX_ROOT_METADATA_DIR + " " + AsterixEventServiceUtil.TXN_LOG_DIR + " "
+ backupId + " " + hdfsBackupDir + " " + "hdfs" + " " + node.getId() + " " + hdfsUrl + " "
@@ -188,12 +188,12 @@ public class PatternCreator {
String txnLogDir;
String store;
String pargs;
+ store = cluster.getStore();
List<Pattern> patternList = new ArrayList<Pattern>();
for (Node node : cluster.getNode()) {
Nodeid nodeid = new Nodeid(new Value(null, node.getId()));
iodevices = node.getIodevices() == null ? instance.getCluster().getIodevices() : node.getIodevices();
txnLogDir = node.getTxnLogDir() == null ? instance.getCluster().getTxnLogDir() : node.getTxnLogDir();
- store = node.getStore() == null ? cluster.getStore() : node.getStore();
pargs = workingDir + " " + instance.getName() + " " + iodevices + " " + store + " "
+ AsterixConstants.ASTERIX_ROOT_METADATA_DIR + " " + txnLogDir + " " + backupId + " " + backupDir
+ " " + "local" + " " + node.getId();
@@ -212,14 +212,12 @@ public class PatternCreator {
VerificationUtil.verifyBackupRestoreConfiguration(hdfsUrl, hadoopVersion, hdfsBackupDir);
String workingDir = cluster.getWorkingDir().getDir();
int backupId = backupInfo.getId();
- String nodeStore;
String pargs;
List<Pattern> patternList = new ArrayList<Pattern>();
for (Node node : cluster.getNode()) {
Nodeid nodeid = new Nodeid(new Value(null, node.getId()));
String iodevices = node.getIodevices() == null ? cluster.getIodevices() : node.getIodevices();
- nodeStore = node.getStore() == null ? clusterStore : node.getStore();
- pargs = workingDir + " " + instance.getName() + " " + iodevices + " " + nodeStore + " "
+ pargs = workingDir + " " + instance.getName() + " " + iodevices + " " + clusterStore + " "
+ AsterixConstants.ASTERIX_ROOT_METADATA_DIR + " " + AsterixEventServiceUtil.TXN_LOG_DIR + " "
+ backupId + " " + " " + hdfsBackupDir + " " + "hdfs" + " " + node.getId() + " " + hdfsUrl + " "
+ hadoopVersion;
@@ -235,14 +233,12 @@ public class PatternCreator {
String backupDir = backupInfo.getBackupConf().getBackupDir();
String workingDir = cluster.getWorkingDir().getDir();
int backupId = backupInfo.getId();
- String nodeStore;
String pargs;
List<Pattern> patternList = new ArrayList<Pattern>();
for (Node node : cluster.getNode()) {
Nodeid nodeid = new Nodeid(new Value(null, node.getId()));
String iodevices = node.getIodevices() == null ? cluster.getIodevices() : node.getIodevices();
- nodeStore = node.getStore() == null ? clusterStore : node.getStore();
- pargs = workingDir + " " + instance.getName() + " " + iodevices + " " + nodeStore + " "
+ pargs = workingDir + " " + instance.getName() + " " + iodevices + " " + clusterStore + " "
+ AsterixConstants.ASTERIX_ROOT_METADATA_DIR + " " + AsterixEventServiceUtil.TXN_LOG_DIR + " "
+ backupId + " " + backupDir + " " + "local" + " " + node.getId();
Event event = new Event("restore", nodeid, pargs);
@@ -262,8 +258,8 @@ public class PatternCreator {
Nodeid nodeid = new Nodeid(new Value(null, EventDriver.CLIENT_NODE.getId()));
String username = cluster.getUsername() != null ? cluster.getUsername() : System.getProperty("user.name");
- String pargs = username + " " + hadoopDir.getAbsolutePath() + " " + cluster.getMasterNode().getClusterIp()
- + " " + workingDir;
+ String pargs = username + " " + hadoopDir.getAbsolutePath() + " " + cluster.getMasterNode().getClusterIp() + " "
+ + workingDir;
Event event = new Event("directory_transfer", nodeid, pargs);
Pattern p = new Pattern(null, 1, null, event);
addInitialDelay(p, 2, "sec");
@@ -428,8 +424,8 @@ public class PatternCreator {
patternList.add(p);
}
- pargs = username + " " + fileToTransfer + " " + cluster.getMasterNode().getClusterIp() + " " + destDir
- + " " + "unpack";
+ pargs = username + " " + fileToTransfer + " " + cluster.getMasterNode().getClusterIp() + " " + destDir + " "
+ + "unpack";
event = new Event("file_transfer", nodeid, pargs);
p = new Pattern(null, 1, null, event);
patternList.add(p);
@@ -529,8 +525,8 @@ public class PatternCreator {
String[] nodeIODevices;
String iodevices = node.getIodevices() == null ? cluster.getIodevices() : node.getIodevices();
nodeIODevices = iodevices.trim().split(",");
+ String nodeStore = cluster.getStore().trim();
for (String nodeIODevice : nodeIODevices) {
- String nodeStore = node.getStore() == null ? cluster.getStore() : node.getStore();
pargs = nodeIODevice.trim() + File.separator + nodeStore;
Event event = new Event("file_delete", nodeid, pargs);
patternList.add(new Pattern(null, 1, null, event));
@@ -540,13 +536,15 @@ public class PatternCreator {
return patterns;
}
- private Pattern createCopyHyracksPattern(String instanceName, Cluster cluster, String destinationIp, String destDir) {
+ private Pattern createCopyHyracksPattern(String instanceName, Cluster cluster, String destinationIp,
+ String destDir) {
Nodeid nodeid = new Nodeid(new Value(null, EventDriver.CLIENT_NODE.getId()));
String username = cluster.getUsername() != null ? cluster.getUsername() : System.getProperty("user.name");
- String asterixZipName = AsterixEventService.getAsterixZip().substring(
- AsterixEventService.getAsterixZip().lastIndexOf(File.separator) + 1);
- String fileToTransfer = new File(AsterixEventService.getAsterixDir() + File.separator + instanceName
- + File.separator + asterixZipName).getAbsolutePath();
+ String asterixZipName = AsterixEventService.getAsterixZip()
+ .substring(AsterixEventService.getAsterixZip().lastIndexOf(File.separator) + 1);
+ String fileToTransfer = new File(
+ AsterixEventService.getAsterixDir() + File.separator + instanceName + File.separator + asterixZipName)
+ .getAbsolutePath();
String pargs = username + " " + fileToTransfer + " " + destinationIp + " " + destDir + " " + "unpack";
Event event = new Event("file_transfer", nodeid, pargs);
return new Pattern(null, 1, null, event);
@@ -607,8 +605,8 @@ public class PatternCreator {
ps.add(p);
nodeid = new Nodeid(new Value(null, nodeToBeAdded.getId()));
- pargs = cluster.getUsername() + " " + hadoopDir.getAbsolutePath() + " " + nodeToBeAdded.getClusterIp()
- + " " + workingDir;
+ pargs = cluster.getUsername() + " " + hadoopDir.getAbsolutePath() + " " + nodeToBeAdded.getClusterIp() + " "
+ + workingDir;
event = new Event("directory_transfer", nodeid, pargs);
p = new Pattern(null, 1, null, event);
addInitialDelay(p, 2, "sec");
@@ -626,8 +624,8 @@ public class PatternCreator {
String username = cluster.getUsername() == null ? System.getProperty("user.name") : cluster.getUsername();
String srcHost = cluster.getMasterNode().getClientIp();
Nodeid nodeid = new Nodeid(new Value(null, EventDriver.CLIENT_NODE.getId()));
- String srcDir = cluster.getMasterNode().getLogDir() == null ? cluster.getLogDir() : cluster.getMasterNode()
- .getLogDir();
+ String srcDir = cluster.getMasterNode().getLogDir() == null ? cluster.getLogDir()
+ : cluster.getMasterNode().getLogDir();
String destDir = outputDir + File.separator + "cc";
String pargs = username + " " + srcHost + " " + srcDir + " " + destDir;
Event event = new Event("directory_copy", nodeid, pargs);
@@ -649,7 +647,7 @@ public class PatternCreator {
Patterns patterns = new Patterns(patternList);
return patterns;
}
-
+
private Patterns createRemoveAsterixReplicationPattern(AsterixInstance instance) throws Exception {
List<Pattern> patternList = new ArrayList<Pattern>();
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSAdapterFactory.java
index ebf41cc..c4a96f4 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSAdapterFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSAdapterFactory.java
@@ -34,7 +34,6 @@ import org.apache.asterix.external.indexing.dataflow.HDFSObjectTupleParserFactor
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.om.util.AsterixAppContextInfo;
-import org.apache.asterix.om.util.AsterixClusterProperties;
import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory;
import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory.InputDataFormat;
import org.apache.hadoop.fs.BlockLocation;
@@ -211,12 +210,10 @@ public class HDFSAdapterFactory extends StreamBasedAdapterFactory implements IAd
Map<String, String[]> stores = AsterixAppContextInfo.getInstance().getMetadataProperties().getStores();
for (String i : stores.keySet()) {
String[] nodeStores = stores.get(i);
- int numIODevices = AsterixClusterProperties.INSTANCE.getNumberOfIODevices(i);
for (int j = 0; j < nodeStores.length; j++) {
- for (int k = 0; k < numIODevices; k++) {
- locs.add(i);
- locs.add(i);
- }
+ //two readers per partition
+ locs.add(i);
+ locs.add(i);
}
}
String[] cluster = new String[locs.size()];
@@ -273,67 +270,67 @@ public class HDFSAdapterFactory extends StreamBasedAdapterFactory implements IAd
* @throws IOException
*/
protected InputSplit[] getSplits(JobConf conf) throws IOException {
- // Create file system object
- FileSystem fs = FileSystem.get(conf);
ArrayList<FileSplit> fileSplits = new ArrayList<FileSplit>();
ArrayList<ExternalFile> orderedExternalFiles = new ArrayList<ExternalFile>();
- // Create files splits
- for (ExternalFile file : files) {
- Path filePath = new Path(file.getFileName());
- FileStatus fileStatus;
- try {
- fileStatus = fs.getFileStatus(filePath);
- } catch (FileNotFoundException e) {
- // file was deleted at some point, skip to next file
- continue;
- }
- if (file.getPendingOp() == ExternalFilePendingOp.PENDING_ADD_OP
- && fileStatus.getModificationTime() == file.getLastModefiedTime().getTime()) {
- // Get its information from HDFS name node
- BlockLocation[] fileBlocks = fs.getFileBlockLocations(fileStatus, 0, file.getSize());
- // Create a split per block
- for (BlockLocation block : fileBlocks) {
- if (block.getOffset() < file.getSize()) {
- fileSplits
- .add(new FileSplit(filePath,
- block.getOffset(), (block.getLength() + block.getOffset()) < file.getSize()
- ? block.getLength() : (file.getSize() - block.getOffset()),
- block.getHosts()));
- orderedExternalFiles.add(file);
- }
- }
- } else if (file.getPendingOp() == ExternalFilePendingOp.PENDING_NO_OP
- && fileStatus.getModificationTime() == file.getLastModefiedTime().getTime()) {
- long oldSize = 0L;
- long newSize = file.getSize();
- for (int i = 0; i < files.size(); i++) {
- if (files.get(i).getFileName() == file.getFileName() && files.get(i).getSize() != file.getSize()) {
- newSize = files.get(i).getSize();
- oldSize = file.getSize();
- break;
- }
+ // Create file system object
+ try (FileSystem fs = FileSystem.get(conf)) {
+ // Create files splits
+ for (ExternalFile file : files) {
+ Path filePath = new Path(file.getFileName());
+ FileStatus fileStatus;
+ try {
+ fileStatus = fs.getFileStatus(filePath);
+ } catch (FileNotFoundException e) {
+ // file was deleted at some point, skip to next file
+ continue;
}
-
- // Get its information from HDFS name node
- BlockLocation[] fileBlocks = fs.getFileBlockLocations(fileStatus, 0, newSize);
- // Create a split per block
- for (BlockLocation block : fileBlocks) {
- if (block.getOffset() + block.getLength() > oldSize) {
- if (block.getOffset() < newSize) {
- // Block interact with delta -> Create a split
- long startCut = (block.getOffset() > oldSize) ? 0L : oldSize - block.getOffset();
- long endCut = (block.getOffset() + block.getLength() < newSize) ? 0L
- : block.getOffset() + block.getLength() - newSize;
- long splitLength = block.getLength() - startCut - endCut;
- fileSplits.add(new FileSplit(filePath, block.getOffset() + startCut, splitLength,
+ if (file.getPendingOp() == ExternalFilePendingOp.PENDING_ADD_OP
+ && fileStatus.getModificationTime() == file.getLastModefiedTime().getTime()) {
+ // Get its information from HDFS name node
+ BlockLocation[] fileBlocks = fs.getFileBlockLocations(fileStatus, 0, file.getSize());
+ // Create a split per block
+ for (BlockLocation block : fileBlocks) {
+ if (block.getOffset() < file.getSize()) {
+ fileSplits.add(new FileSplit(filePath,
+ block.getOffset(), (block.getLength() + block.getOffset()) < file.getSize()
+ ? block.getLength() : (file.getSize() - block.getOffset()),
block.getHosts()));
orderedExternalFiles.add(file);
}
}
+ } else if (file.getPendingOp() == ExternalFilePendingOp.PENDING_NO_OP
+ && fileStatus.getModificationTime() == file.getLastModefiedTime().getTime()) {
+ long oldSize = 0L;
+ long newSize = file.getSize();
+ for (int i = 0; i < files.size(); i++) {
+ if (files.get(i).getFileName() == file.getFileName()
+ && files.get(i).getSize() != file.getSize()) {
+ newSize = files.get(i).getSize();
+ oldSize = file.getSize();
+ break;
+ }
+ }
+
+ // Get its information from HDFS name node
+ BlockLocation[] fileBlocks = fs.getFileBlockLocations(fileStatus, 0, newSize);
+ // Create a split per block
+ for (BlockLocation block : fileBlocks) {
+ if (block.getOffset() + block.getLength() > oldSize) {
+ if (block.getOffset() < newSize) {
+ // Block interact with delta -> Create a split
+ long startCut = (block.getOffset() > oldSize) ? 0L : oldSize - block.getOffset();
+ long endCut = (block.getOffset() + block.getLength() < newSize) ? 0L
+ : block.getOffset() + block.getLength() - newSize;
+ long splitLength = block.getLength() - startCut - endCut;
+ fileSplits.add(new FileSplit(filePath, block.getOffset() + startCut, splitLength,
+ block.getHosts()));
+ orderedExternalFiles.add(file);
+ }
+ }
+ }
}
}
}
- fs.close();
files = orderedExternalFiles;
return fileSplits.toArray(new FileSplit[fileSplits.size()]);
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSIndexingAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSIndexingAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSIndexingAdapterFactory.java
index 11e2b96..8bf6d93 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSIndexingAdapterFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSIndexingAdapterFactory.java
@@ -32,7 +32,6 @@ import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.om.types.AUnionType;
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.om.util.AsterixAppContextInfo;
-import org.apache.asterix.om.util.AsterixClusterProperties;
import org.apache.asterix.om.util.NonTaggedFormatUtil;
import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory;
import org.apache.asterix.runtime.operators.file.DelimitedDataParser;
@@ -186,11 +185,8 @@ public class HDFSIndexingAdapterFactory extends HDFSAdapterFactory {
Map<String, String[]> stores = AsterixAppContextInfo.getInstance().getMetadataProperties().getStores();
for (String i : stores.keySet()) {
String[] nodeStores = stores.get(i);
- int numIODevices = AsterixClusterProperties.INSTANCE.getNumberOfIODevices(i);
for (int j = 0; j < nodeStores.length; j++) {
- for (int k = 0; k < numIODevices; k++) {
- locs.add(i);
- }
+ locs.add(i);
}
}
String[] cluster = new String[locs.size()];
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesAbortOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesAbortOperatorDescriptor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesAbortOperatorDescriptor.java
index bce4620..6ff991b 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesAbortOperatorDescriptor.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesAbortOperatorDescriptor.java
@@ -18,7 +18,6 @@
*/
package org.apache.asterix.external.indexing.operators;
-import java.io.File;
import java.util.List;
import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -49,9 +48,7 @@ public class ExternalDatasetIndexesAbortOperatorDescriptor extends AbstractExter
@Override
protected void performOpOnIndex(IIndexDataflowHelperFactory indexDataflowHelperFactory, IHyracksTaskContext ctx,
IndexInfoOperatorDescriptor fileIndexInfo, int partition) throws Exception {
- FileReference file = new FileReference(new File(IndexFileNameUtil.prepareFileName(fileIndexInfo
- .getFileSplitProvider().getFileSplits()[partition].getLocalFile().getFile().getPath(), fileIndexInfo
- .getFileSplitProvider().getFileSplits()[partition].getIODeviceId())));
+ FileReference file = IndexFileNameUtil.getIndexAbsoluteFileRef(fileIndexInfo, partition, ctx.getIOManager());
AbortRecoverLSMIndexFileManager fileManager = new AbortRecoverLSMIndexFileManager(file);
fileManager.deleteTransactionFiles();
}