You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/09/12 09:13:34 UTC
[2/3] asterixdb git commit: Small Cleanup Towards Fixing LifeCycle
Issues
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11a02942/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/IndexInfo.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/IndexInfo.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/IndexInfo.java
new file mode 100644
index 0000000..34ccce0
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/IndexInfo.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.context;
+
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+
+public class IndexInfo extends Info {
+ private final ILSMIndex index;
+ private final long resourceId;
+ private final int datasetId;
+
+ public IndexInfo(ILSMIndex index, int datasetId, long resourceId) {
+ this.index = index;
+ this.datasetId = datasetId;
+ this.resourceId = resourceId;
+ }
+
+ public ILSMIndex getIndex() {
+ return index;
+ }
+
+ public long getResourceId() {
+ return resourceId;
+ }
+
+ public int getDatasetId() {
+ return datasetId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11a02942/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/Info.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/Info.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/Info.java
new file mode 100644
index 0000000..999eb34
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/Info.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.context;
+
+public abstract class Info {
+ private int referenceCount;
+ private boolean isOpen;
+
+ public Info() {
+ referenceCount = 0;
+ isOpen = false;
+ }
+
+ public void touch() {
+ ++referenceCount;
+ }
+
+ public void untouch() {
+ --referenceCount;
+ }
+
+ public int getReferenceCount() {
+ return referenceCount;
+ }
+
+ public void setReferenceCount(int referenceCount) {
+ this.referenceCount = referenceCount;
+ }
+
+ public boolean isOpen() {
+ return isOpen;
+ }
+
+ public void setOpen(boolean isOpen) {
+ this.isOpen = isOpen;
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11a02942/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index 71c30d5..6350d73 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -22,7 +22,6 @@ package org.apache.asterix.common.context;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.asterix.common.context.DatasetLifecycleManager.DatasetInfo;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
import org.apache.asterix.common.transactions.AbstractOperationCallback;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11a02942/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/AbstractApplicationMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/AbstractApplicationMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/AbstractApplicationMessage.java
deleted file mode 100644
index 5737957..0000000
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/AbstractApplicationMessage.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.messaging;
-
-import org.apache.asterix.common.messaging.api.IApplicationMessage;
-
-public abstract class AbstractApplicationMessage implements IApplicationMessage {
- private static final long serialVersionUID = 1L;
- protected long id;
-
- @Override
- public void setId(long id) {
- this.id = id;
- }
-
- @Override
- public long getId() {
- return id;
- }
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11a02942/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
index b93082d..dbd2139 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
@@ -22,31 +22,11 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.messages.IMessage;
import org.apache.hyracks.api.service.IControllerService;
+@FunctionalInterface
public interface IApplicationMessage extends IMessage {
/**
- * Sets a unique message id that identifies this message within an NC.
- * This id is set by {@link INCMessageBroker#sendMessageToCC(IApplicationMessage, IApplicationMessageCallback)}
- * when the callback is not null to notify the sender when the response to that message is received.
- *
- * @param messageId
- */
- public void setId(long messageId);
-
- /**
- * @return The unique message id if it has been set, otherwise 0.
- */
- public long getId();
-
- /**
* handle the message upon delivery
*/
- public void handle(IControllerService cs) throws HyracksDataException;
-
- /**
- * get a string representation for the message type
- *
- * @return
- */
- public String type();
+ public void handle(IControllerService cs) throws HyracksDataException, InterruptedException;
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11a02942/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessageCallback.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessageCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessageCallback.java
deleted file mode 100644
index 3bad5fb..0000000
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessageCallback.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.messaging.api;
-
-public interface IApplicationMessageCallback {
-
- /**
- * Notifies the message sender when the response has been received.
- *
- * @param message
- * The response message
- */
- public void deliverMessageResponse(IApplicationMessage message);
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11a02942/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
index 2290b93..ced2b6d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
@@ -21,7 +21,6 @@ package org.apache.asterix.common.messaging.api;
import org.apache.hyracks.api.messages.IMessageBroker;
public interface ICCMessageBroker extends IMessageBroker {
- public static final long NO_CALLBACK_MESSAGE_ID = -1;
/**
* Sends the passed message to the specified {@code nodeId}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11a02942/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
index f01d0c3..707f864 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
@@ -29,7 +29,7 @@ public interface INCMessageBroker extends IMessageBroker {
* @param callback
* @throws Exception
*/
- public void sendMessageToCC(IApplicationMessage message, IApplicationMessageCallback callback) throws Exception;
+ public void sendMessageToCC(IApplicationMessage message) throws Exception;
/**
* Sends application message from this NC to another NC.
@@ -38,7 +38,7 @@ public interface INCMessageBroker extends IMessageBroker {
* @param callback
* @throws Exception
*/
- public void sendMessageToNC(String nodeId, IApplicationMessage message, IApplicationMessageCallback callback)
+ public void sendMessageToNC(String nodeId, IApplicationMessage message)
throws Exception;
/**
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11a02942/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java
index d0dd3fa..b66bcf8 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java
@@ -27,7 +27,6 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.storage.common.file.IFileMapProvider;
import org.apache.hyracks.storage.common.file.ILocalResourceRepository;
-import org.apache.hyracks.storage.common.file.IResourceIdFactory;
public interface IAsterixAppRuntimeContextProvider {
@@ -49,8 +48,6 @@ public interface IAsterixAppRuntimeContextProvider {
public ILocalResourceRepository getLocalResourceRepository();
- public IResourceIdFactory getResourceIdFactory();
-
public IIOManager getIOManager();
public IAsterixAppRuntimeContext getAppContext();
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11a02942/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
index c929b41..6de796e 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
@@ -407,7 +407,7 @@ public class MetadataNode implements IMetadataNode {
throws ACIDException, HyracksDataException, IndexException {
long resourceID = metadataIndex.getResourceID();
String resourceName = metadataIndex.getFile().toString();
- ILSMIndex lsmIndex = (ILSMIndex) datasetLifecycleManager.getIndex(resourceName);
+ ILSMIndex lsmIndex = (ILSMIndex) datasetLifecycleManager.get(resourceName);
try {
datasetLifecycleManager.open(resourceName);
@@ -674,7 +674,7 @@ public class MetadataNode implements IMetadataNode {
throws ACIDException, HyracksDataException, IndexException {
long resourceID = metadataIndex.getResourceID();
String resourceName = metadataIndex.getFile().toString();
- ILSMIndex lsmIndex = (ILSMIndex) datasetLifecycleManager.getIndex(resourceName);
+ ILSMIndex lsmIndex = (ILSMIndex) datasetLifecycleManager.get(resourceName);
try {
datasetLifecycleManager.open(resourceName);
// prepare a Callback for logging
@@ -1066,7 +1066,7 @@ public class MetadataNode implements IMetadataNode {
try {
IMetadataIndex index = MetadataPrimaryIndexes.DATAVERSE_DATASET;
String resourceName = index.getFile().toString();
- IIndex indexInstance = datasetLifecycleManager.getIndex(resourceName);
+ IIndex indexInstance = datasetLifecycleManager.get(resourceName);
datasetLifecycleManager.open(resourceName);
IIndexAccessor indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
@@ -1088,7 +1088,7 @@ public class MetadataNode implements IMetadataNode {
datasetLifecycleManager.close(resourceName);
index = MetadataPrimaryIndexes.DATASET_DATASET;
- indexInstance = datasetLifecycleManager.getIndex(resourceName);
+ indexInstance = datasetLifecycleManager.get(resourceName);
datasetLifecycleManager.open(resourceName);
indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
@@ -1113,7 +1113,7 @@ public class MetadataNode implements IMetadataNode {
datasetLifecycleManager.close(resourceName);
index = MetadataPrimaryIndexes.INDEX_DATASET;
- indexInstance = datasetLifecycleManager.getIndex(resourceName);
+ indexInstance = datasetLifecycleManager.get(resourceName);
datasetLifecycleManager.open(resourceName);
indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
@@ -1149,7 +1149,7 @@ public class MetadataNode implements IMetadataNode {
throw new MetadataException("No file for Index " + index.getDataverseName() + "." + index.getIndexName());
}
String resourceName = index.getFile().toString();
- IIndex indexInstance = datasetLifecycleManager.getIndex(resourceName);
+ IIndex indexInstance = datasetLifecycleManager.get(resourceName);
datasetLifecycleManager.open(resourceName);
IIndexAccessor indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
@@ -1187,7 +1187,7 @@ public class MetadataNode implements IMetadataNode {
int mostRecentDatasetId = MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID;
try {
String resourceName = MetadataPrimaryIndexes.DATASET_DATASET.getFile().toString();
- IIndex indexInstance = datasetLifecycleManager.getIndex(resourceName);
+ IIndex indexInstance = datasetLifecycleManager.get(resourceName);
datasetLifecycleManager.open(resourceName);
try {
IIndexAccessor indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE,
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11a02942/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index 2629fea..55b1045 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -32,9 +32,9 @@ import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.api.ILocalResourceMetadata;
import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.config.AsterixMetadataProperties;
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.config.GlobalConfig;
import org.apache.asterix.common.config.IAsterixPropertiesProvider;
-import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.context.BaseOperationTracker;
import org.apache.asterix.common.context.CorrelatedPrefixMergePolicyFactory;
import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
@@ -59,10 +59,10 @@ import org.apache.asterix.metadata.entities.Dataverse;
import org.apache.asterix.metadata.entities.FeedPolicyEntity;
import org.apache.asterix.metadata.entities.Index;
import org.apache.asterix.metadata.entities.InternalDatasetDetails;
-import org.apache.asterix.metadata.entities.Node;
-import org.apache.asterix.metadata.entities.NodeGroup;
import org.apache.asterix.metadata.entities.InternalDatasetDetails.FileStructure;
import org.apache.asterix.metadata.entities.InternalDatasetDetails.PartitioningStrategy;
+import org.apache.asterix.metadata.entities.Node;
+import org.apache.asterix.metadata.entities.NodeGroup;
import org.apache.asterix.metadata.feeds.BuiltinFeedPolicies;
import org.apache.asterix.metadata.utils.MetadataConstants;
import org.apache.asterix.om.types.BuiltinType;
@@ -319,6 +319,8 @@ public class MetadataBootstrap {
String resourceName = metadataPartitionPath + File.separator + index.getFileNameRelativePath();
FileReference file = ioManager.getAbsoluteFileRef(metadataDeviceId, resourceName);
+ // this should not be done this way. dataset lifecycle manager shouldn't return virtual buffer caches for
+ // a dataset that was not yet created
List<IVirtualBufferCache> virtualBufferCaches = runtimeContext.getDatasetLifecycleManager()
.getVirtualBufferCaches(index.getDatasetId().getId(), metadataPartition.getIODeviceNum());
ITypeTraits[] typeTraits = index.getTypeTraits();
@@ -360,7 +362,7 @@ public class MetadataBootstrap {
+ " to intialize as a new instance. (WARNING: all data will be lost.)");
}
resourceID = resource.getResourceId();
- lsmBtree = (LSMBTree) dataLifecycleManager.getIndex(absolutePath);
+ lsmBtree = (LSMBTree) dataLifecycleManager.get(absolutePath);
if (lsmBtree == null) {
lsmBtree = LSMBTreeUtils.createLSMTree(virtualBufferCaches, file, bufferCache, fileMapProvider,
typeTraits, comparatorFactories, bloomFilterKeyFields,
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11a02942/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
index 5a88a9f..c0d8320 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
@@ -46,7 +46,7 @@ import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.config.AsterixReplicationProperties;
import org.apache.asterix.common.config.IAsterixPropertiesProvider;
-import org.apache.asterix.common.context.DatasetLifecycleManager.IndexInfo;
+import org.apache.asterix.common.context.IndexInfo;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
import org.apache.asterix.common.replication.IReplicaResourcesManager;
@@ -59,7 +59,6 @@ import org.apache.asterix.common.transactions.ILogManager;
import org.apache.asterix.common.transactions.LogRecord;
import org.apache.asterix.common.transactions.LogSource;
import org.apache.asterix.common.transactions.LogType;
-import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.asterix.common.utils.TransactionUtil;
import org.apache.asterix.replication.functions.ReplicaFilesRequest;
import org.apache.asterix.replication.functions.ReplicaIndexFlushRequest;
@@ -70,6 +69,7 @@ import org.apache.asterix.replication.storage.LSMComponentLSNSyncTask;
import org.apache.asterix.replication.storage.LSMComponentProperties;
import org.apache.asterix.replication.storage.LSMIndexFileProperties;
import org.apache.asterix.replication.storage.ReplicaResourcesManager;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.hyracks.api.application.INCApplicationContext;
import org.apache.hyracks.storage.am.common.api.IMetaDataPageManager;
import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11a02942/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/AbstractFailbackPlanMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/AbstractFailbackPlanMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/AbstractFailbackPlanMessage.java
index 2eb1be9..7c386d5 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/AbstractFailbackPlanMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/AbstractFailbackPlanMessage.java
@@ -18,9 +18,9 @@
*/
package org.apache.asterix.runtime.message;
-import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.common.messaging.api.IApplicationMessage;
-public abstract class AbstractFailbackPlanMessage extends AbstractApplicationMessage {
+public abstract class AbstractFailbackPlanMessage implements IApplicationMessage {
private static final long serialVersionUID = 1L;
protected final long planId;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11a02942/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackRequestMessage.java
index c96bcb8..5cb1a6a 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackRequestMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackRequestMessage.java
@@ -55,7 +55,8 @@ public class CompleteFailbackRequestMessage extends AbstractFailbackPlanMessage
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
- sb.append("Plan ID: " + planId);
+ sb.append(CompleteFailbackRequestMessage.class.getSimpleName());
+ sb.append(" Plan ID: " + planId);
sb.append(" Node ID: " + nodeId);
sb.append(" Partitions: " + partitions);
return sb.toString();
@@ -78,7 +79,7 @@ public class CompleteFailbackRequestMessage extends AbstractFailbackPlanMessage
CompleteFailbackResponseMessage reponse = new CompleteFailbackResponseMessage(planId,
requestId, partitions);
try {
- broker.sendMessageToCC(reponse, null);
+ broker.sendMessageToCC(reponse);
} catch (Exception e) {
LOGGER.log(Level.SEVERE, "Failure sending message to CC", e);
hde = ExceptionUtils.suppressIntoHyracksDataException(hde, e);
@@ -88,9 +89,4 @@ public class CompleteFailbackRequestMessage extends AbstractFailbackPlanMessage
throw hde;
}
}
-
- @Override
- public String type() {
- return "COMPLETE_FAILBACK_REQUEST";
- }
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11a02942/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackResponseMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackResponseMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackResponseMessage.java
index 3c4b58c..73e5fd2 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackResponseMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackResponseMessage.java
@@ -41,7 +41,8 @@ public class CompleteFailbackResponseMessage extends AbstractFailbackPlanMessage
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
- sb.append("Plan ID: " + planId);
+ sb.append(CompleteFailbackResponseMessage.class.getSimpleName());
+ sb.append(" Plan ID: " + planId);
sb.append(" Partitions: " + partitions);
return sb.toString();
}
@@ -50,9 +51,4 @@ public class CompleteFailbackResponseMessage extends AbstractFailbackPlanMessage
public void handle(IControllerService cs) throws HyracksDataException {
AsterixClusterProperties.INSTANCE.processCompleteFailbackResponse(this);
}
-
- @Override
- public String type() {
- return "COMPLETE_FAILBACK_RESPONSE";
- }
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11a02942/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackRequestMessage.java
index e3b9fbe..c112366 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackRequestMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackRequestMessage.java
@@ -64,7 +64,8 @@ public class PreparePartitionsFailbackRequestMessage extends AbstractFailbackPla
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
- sb.append("Plan ID: " + planId);
+ sb.append(PreparePartitionsFailbackRequestMessage.class.getSimpleName());
+ sb.append(" Plan ID: " + planId);
sb.append(" Partitions: " + partitions);
sb.append(" releaseMetadataNode: " + releaseMetadataNode);
return sb.toString();
@@ -109,15 +110,10 @@ public class PreparePartitionsFailbackRequestMessage extends AbstractFailbackPla
PreparePartitionsFailbackResponseMessage reponse = new PreparePartitionsFailbackResponseMessage(planId,
requestId, partitions);
try {
- broker.sendMessageToCC(reponse, null);
+ broker.sendMessageToCC(reponse);
} catch (Exception e) {
LOGGER.log(Level.SEVERE, "Failed sending message to cc", e);
throw ExceptionUtils.convertToHyracksDataException(e);
}
}
-
- @Override
- public String type() {
- return "PREPARE_PARTITIONS_FAILBACK_REQUEST";
- }
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11a02942/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackResponseMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackResponseMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackResponseMessage.java
index 2e52773..299121a 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackResponseMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackResponseMessage.java
@@ -44,7 +44,7 @@ public class PreparePartitionsFailbackResponseMessage extends AbstractFailbackPl
}
@Override
- public String type() {
- return "PREPARE_PARTITIONS_FAILBACK_RESPONSE";
+ public String toString() {
+ return PreparePartitionsFailbackResponseMessage.class.getSimpleName() + " " + partitions.toString();
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11a02942/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReplicaEventMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReplicaEventMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReplicaEventMessage.java
index 2aa4746..fc55968 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReplicaEventMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReplicaEventMessage.java
@@ -19,7 +19,7 @@
package org.apache.asterix.runtime.message;
import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
-import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.common.messaging.api.IApplicationMessage;
import org.apache.asterix.common.replication.Replica;
import org.apache.asterix.common.replication.ReplicaEvent;
import org.apache.asterix.event.schema.cluster.Node;
@@ -28,7 +28,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.service.IControllerService;
import org.apache.hyracks.control.nc.NodeControllerService;
-public class ReplicaEventMessage extends AbstractApplicationMessage {
+public class ReplicaEventMessage implements IApplicationMessage {
private static final long serialVersionUID = 1L;
private final String nodeId;
@@ -66,7 +66,7 @@ public class ReplicaEventMessage extends AbstractApplicationMessage {
}
@Override
- public String type() {
- return "REPLICA_EVENT";
+ public String toString() {
+ return ReplicaEventMessage.class.getSimpleName();
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11a02942/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java
index 1604e29..f9f6233 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java
@@ -23,7 +23,7 @@ import java.util.logging.Logger;
import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
import org.apache.asterix.common.exceptions.ExceptionUtils;
-import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.common.messaging.api.IApplicationMessage;
import org.apache.asterix.common.messaging.api.INCMessageBroker;
import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
import org.apache.asterix.common.transactions.IAsterixResourceIdManager;
@@ -32,7 +32,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.service.IControllerService;
import org.apache.hyracks.control.nc.NodeControllerService;
-public class ReportMaxResourceIdMessage extends AbstractApplicationMessage {
+public class ReportMaxResourceIdMessage implements IApplicationMessage {
private static final long serialVersionUID = 1L;
private static final Logger LOGGER = Logger.getLogger(ReportMaxResourceIdMessage.class.getName());
private final long maxResourceId;
@@ -62,7 +62,7 @@ public class ReportMaxResourceIdMessage extends AbstractApplicationMessage {
MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID);
ReportMaxResourceIdMessage maxResourceIdMsg = new ReportMaxResourceIdMessage(ncs.getId(), maxResourceId);
try {
- ((INCMessageBroker) ncs.getApplicationContext().getMessageBroker()).sendMessageToCC(maxResourceIdMsg, null);
+ ((INCMessageBroker) ncs.getApplicationContext().getMessageBroker()).sendMessageToCC(maxResourceIdMsg);
} catch (Exception e) {
LOGGER.log(Level.SEVERE, "Unable to report max local resource id", e);
throw ExceptionUtils.convertToHyracksDataException(e);
@@ -70,7 +70,7 @@ public class ReportMaxResourceIdMessage extends AbstractApplicationMessage {
}
@Override
- public String type() {
- return "REPORT_MAX_RESOURCE_ID_RESPONSE";
+ public String toString() {
+ return ReportMaxResourceIdMessage.class.getSimpleName();
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11a02942/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdRequestMessage.java
index 3e2becf..203104e 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdRequestMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdRequestMessage.java
@@ -18,12 +18,12 @@
*/
package org.apache.asterix.runtime.message;
-import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.common.messaging.api.IApplicationMessage;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.service.IControllerService;
import org.apache.hyracks.control.nc.NodeControllerService;
-public class ReportMaxResourceIdRequestMessage extends AbstractApplicationMessage {
+public class ReportMaxResourceIdRequestMessage implements IApplicationMessage {
private static final long serialVersionUID = 1L;
@Override
@@ -32,7 +32,7 @@ public class ReportMaxResourceIdRequestMessage extends AbstractApplicationMessag
}
@Override
- public String type() {
- return "REPORT_MAX_RESOURCE_ID_REQUEST";
+ public String toString() {
+ return ReportMaxResourceIdRequestMessage.class.getSimpleName();
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11a02942/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
index 5e7d808..32d3f64 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
@@ -21,7 +21,7 @@ package org.apache.asterix.runtime.message;
import java.util.Set;
import org.apache.asterix.common.exceptions.ExceptionUtils;
-import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.common.messaging.api.IApplicationMessage;
import org.apache.asterix.common.messaging.api.ICCMessageBroker;
import org.apache.asterix.common.transactions.IAsterixResourceIdManager;
import org.apache.asterix.runtime.util.AsterixAppContextInfo;
@@ -29,7 +29,7 @@ import org.apache.asterix.runtime.util.AsterixClusterProperties;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.service.IControllerService;
-public class ResourceIdRequestMessage extends AbstractApplicationMessage {
+public class ResourceIdRequestMessage implements IApplicationMessage {
private static final long serialVersionUID = 1L;
private final String src;
@@ -43,7 +43,6 @@ public class ResourceIdRequestMessage extends AbstractApplicationMessage {
ICCMessageBroker broker =
(ICCMessageBroker) AsterixAppContextInfo.INSTANCE.getCCApplicationContext().getMessageBroker();
ResourceIdRequestResponseMessage reponse = new ResourceIdRequestResponseMessage();
- reponse.setId(id);
if (!AsterixClusterProperties.INSTANCE.isClusterActive()) {
reponse.setResourceId(-1);
reponse.setException(new Exception("Cannot generate global resource id when cluster is not active."));
@@ -66,7 +65,6 @@ public class ResourceIdRequestMessage extends AbstractApplicationMessage {
throws Exception {
Set<String> getParticipantNodes = AsterixClusterProperties.INSTANCE.getParticipantNodes();
ReportMaxResourceIdRequestMessage msg = new ReportMaxResourceIdRequestMessage();
- msg.setId(ICCMessageBroker.NO_CALLBACK_MESSAGE_ID);
for (String nodeId : getParticipantNodes) {
if (!resourceIdManager.reported(nodeId)) {
broker.sendApplicationMessageToNC(msg, nodeId);
@@ -75,7 +73,7 @@ public class ResourceIdRequestMessage extends AbstractApplicationMessage {
}
@Override
- public String type() {
- return "RESOURCE_ID_REQUEST";
+ public String toString() {
+ return ReportMaxResourceIdRequestMessage.class.getSimpleName();
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11a02942/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestResponseMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestResponseMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestResponseMessage.java
index 62c2163..d4cc022 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestResponseMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestResponseMessage.java
@@ -18,11 +18,14 @@
*/
package org.apache.asterix.runtime.message;
-import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.common.messaging.api.IApplicationMessage;
+import org.apache.asterix.runtime.transaction.GlobalResourceIdFactory;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
-public class ResourceIdRequestResponseMessage extends AbstractApplicationMessage {
+public class ResourceIdRequestResponseMessage implements IApplicationMessage {
private static final long serialVersionUID = 1L;
private long resourceId;
@@ -45,13 +48,15 @@ public class ResourceIdRequestResponseMessage extends AbstractApplicationMessage
}
@Override
- public void handle(IControllerService cs) throws HyracksDataException {
- // Do nothing. for this message, the callback handles it, we probably should get rid of callbacks and
- // instead, use the handle in the response to perform callback action
+ public void handle(IControllerService cs) throws HyracksDataException, InterruptedException {
+ NodeControllerService ncs = (NodeControllerService) cs;
+ IAsterixAppRuntimeContext asterixNcAppRuntimeCtx =
+ (IAsterixAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
+ ((GlobalResourceIdFactory) asterixNcAppRuntimeCtx.getResourceIdFactory()).addNewIds(this);
}
@Override
- public String type() {
- return "RESOURCE_ID_RESPONSE";
+ public String toString() {
+ return ResourceIdRequestResponseMessage.class.getSimpleName();
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11a02942/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeRequestMessage.java
index b18a879..e877f52 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeRequestMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeRequestMessage.java
@@ -23,13 +23,13 @@ import java.util.logging.Logger;
import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
import org.apache.asterix.common.exceptions.ExceptionUtils;
-import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.common.messaging.api.IApplicationMessage;
import org.apache.asterix.common.messaging.api.INCMessageBroker;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.service.IControllerService;
import org.apache.hyracks.control.nc.NodeControllerService;
-public class TakeoverMetadataNodeRequestMessage extends AbstractApplicationMessage {
+public class TakeoverMetadataNodeRequestMessage implements IApplicationMessage {
private static final long serialVersionUID = 1L;
private static final Logger LOGGER = Logger.getLogger(TakeoverMetadataNodeRequestMessage.class.getName());
@@ -51,7 +51,7 @@ public class TakeoverMetadataNodeRequestMessage extends AbstractApplicationMessa
TakeoverMetadataNodeResponseMessage reponse = new TakeoverMetadataNodeResponseMessage(
appContext.getTransactionSubsystem().getId());
try {
- broker.sendMessageToCC(reponse, null);
+ broker.sendMessageToCC(reponse);
} catch (Exception e) {
LOGGER.log(Level.SEVERE, "Failed taking over metadata", e);
hde = ExceptionUtils.suppressIntoHyracksDataException(hde, e);
@@ -63,7 +63,7 @@ public class TakeoverMetadataNodeRequestMessage extends AbstractApplicationMessa
}
@Override
- public String type() {
- return "TAKEOVER_METADATA_NODE_REQUEST";
+ public String toString() {
+ return TakeoverMetadataNodeRequestMessage.class.getSimpleName();
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11a02942/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeResponseMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeResponseMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeResponseMessage.java
index f7016bc..e18fc8d 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeResponseMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeResponseMessage.java
@@ -18,12 +18,12 @@
*/
package org.apache.asterix.runtime.message;
-import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.common.messaging.api.IApplicationMessage;
import org.apache.asterix.runtime.util.AsterixClusterProperties;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.service.IControllerService;
-public class TakeoverMetadataNodeResponseMessage extends AbstractApplicationMessage {
+public class TakeoverMetadataNodeResponseMessage implements IApplicationMessage {
private static final long serialVersionUID = 1L;
private final String nodeId;
@@ -42,7 +42,7 @@ public class TakeoverMetadataNodeResponseMessage extends AbstractApplicationMess
}
@Override
- public String type() {
- return "TAKEOVER_METADATA_NODE_RESPONSE";
+ public String toString() {
+ return TakeoverMetadataNodeResponseMessage.class.getSimpleName();
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11a02942/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsRequestMessage.java
index 3b91084..e024eed 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsRequestMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsRequestMessage.java
@@ -25,14 +25,14 @@ import java.util.logging.Logger;
import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.exceptions.ExceptionUtils;
-import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.common.messaging.api.IApplicationMessage;
import org.apache.asterix.common.messaging.api.INCMessageBroker;
import org.apache.asterix.common.replication.IRemoteRecoveryManager;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.service.IControllerService;
import org.apache.hyracks.control.nc.NodeControllerService;
-public class TakeoverPartitionsRequestMessage extends AbstractApplicationMessage {
+public class TakeoverPartitionsRequestMessage implements IApplicationMessage {
private static final long serialVersionUID = 1L;
private static final Logger LOGGER = Logger.getLogger(TakeoverPartitionsRequestMessage.class.getName());
@@ -61,7 +61,8 @@ public class TakeoverPartitionsRequestMessage extends AbstractApplicationMessage
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
- sb.append("Request ID: " + requestId);
+ sb.append(TakeoverPartitionsRequestMessage.class.getSimpleName());
+ sb.append(" Request ID: " + requestId);
sb.append(" Node ID: " + nodeId);
sb.append(" Partitions: ");
for (Integer partitionId : partitions) {
@@ -92,7 +93,7 @@ public class TakeoverPartitionsRequestMessage extends AbstractApplicationMessage
TakeoverPartitionsResponseMessage reponse = new TakeoverPartitionsResponseMessage(requestId,
appContext.getTransactionSubsystem().getId(), partitions);
try {
- broker.sendMessageToCC(reponse, null);
+ broker.sendMessageToCC(reponse);
} catch (Exception e) {
LOGGER.log(Level.SEVERE, "Failure taking over partitions", e);
hde = ExceptionUtils.suppressIntoHyracksDataException(hde, e);
@@ -103,9 +104,4 @@ public class TakeoverPartitionsRequestMessage extends AbstractApplicationMessage
}
}
}
-
- @Override
- public String type() {
- return "TAKEOVER_PARTITIONS_REQUEST";
- }
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11a02942/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsResponseMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsResponseMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsResponseMessage.java
index 9ec71b7..1c57a2a 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsResponseMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsResponseMessage.java
@@ -18,12 +18,12 @@
*/
package org.apache.asterix.runtime.message;
-import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.common.messaging.api.IApplicationMessage;
import org.apache.asterix.runtime.util.AsterixClusterProperties;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.service.IControllerService;
-public class TakeoverPartitionsResponseMessage extends AbstractApplicationMessage {
+public class TakeoverPartitionsResponseMessage implements IApplicationMessage {
private static final long serialVersionUID = 1L;
private final Integer[] partitions;
@@ -54,7 +54,7 @@ public class TakeoverPartitionsResponseMessage extends AbstractApplicationMessag
}
@Override
- public String type() {
- return "TAKEOVER_PARTITIONS_RESPONSE";
+ public String toString() {
+ return TakeoverPartitionsResponseMessage.class.getSimpleName();
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11a02942/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
index 4cb65c2..a8b61f7 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
@@ -20,8 +20,6 @@ package org.apache.asterix.runtime.transaction;
import java.util.concurrent.LinkedBlockingQueue;
-import org.apache.asterix.common.messaging.api.IApplicationMessage;
-import org.apache.asterix.common.messaging.api.IApplicationMessageCallback;
import org.apache.asterix.common.messaging.api.INCMessageBroker;
import org.apache.asterix.runtime.message.ResourceIdRequestMessage;
import org.apache.asterix.runtime.message.ResourceIdRequestResponseMessage;
@@ -34,10 +32,10 @@ import org.apache.hyracks.storage.common.file.IResourceIdFactory;
* A resource id factory that generates unique resource ids across all NCs by requesting
* unique ids from the cluster controller.
*/
-public class GlobalResourceIdFactory implements IResourceIdFactory, IApplicationMessageCallback {
+public class GlobalResourceIdFactory implements IResourceIdFactory {
private final IApplicationContext appCtx;
- private final LinkedBlockingQueue<IApplicationMessage> resourceIdResponseQ;
+ private final LinkedBlockingQueue<ResourceIdRequestResponseMessage> resourceIdResponseQ;
private final String nodeId;
public GlobalResourceIdFactory(IApplicationContext appCtx) {
@@ -46,6 +44,10 @@ public class GlobalResourceIdFactory implements IResourceIdFactory, IApplication
this.nodeId = ((NodeControllerService) appCtx.getControllerService()).getApplicationContext().getNodeId();
}
+ public void addNewIds(ResourceIdRequestResponseMessage resourceIdResponse) throws InterruptedException {
+ resourceIdResponseQ.put(resourceIdResponse);
+ }
+
@Override
public long createId() throws HyracksDataException {
try {
@@ -54,15 +56,15 @@ public class GlobalResourceIdFactory implements IResourceIdFactory, IApplication
if (!resourceIdResponseQ.isEmpty()) {
synchronized (resourceIdResponseQ) {
if (!resourceIdResponseQ.isEmpty()) {
- reponse = (ResourceIdRequestResponseMessage) resourceIdResponseQ.take();
+ reponse = resourceIdResponseQ.take();
}
}
}
//if no response available or it has an exception, request a new one
if (reponse == null || reponse.getException() != null) {
ResourceIdRequestMessage msg = new ResourceIdRequestMessage(nodeId);
- ((INCMessageBroker) appCtx.getMessageBroker()).sendMessageToCC(msg, this);
- reponse = (ResourceIdRequestResponseMessage) resourceIdResponseQ.take();
+ ((INCMessageBroker) appCtx.getMessageBroker()).sendMessageToCC(msg);
+ reponse = resourceIdResponseQ.take();
if (reponse.getException() != null) {
throw new HyracksDataException(reponse.getException().getMessage());
}
@@ -72,9 +74,4 @@ public class GlobalResourceIdFactory implements IResourceIdFactory, IApplication
throw new HyracksDataException(e);
}
}
-
- @Override
- public void deliverMessageResponse(IApplicationMessage message) {
- resourceIdResponseQ.offer(message);
- }
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11a02942/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixClusterProperties.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixClusterProperties.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixClusterProperties.java
index d201d60..d9a55ad 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixClusterProperties.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixClusterProperties.java
@@ -411,7 +411,7 @@ public class AsterixClusterProperties {
for (Integer partitionId : request.getPartitions()) {
nodePartitions.add(clusterPartitions.get(partitionId));
}
- failedTakeoverRequests.add(request.getId());
+ failedTakeoverRequests.add(request.getRequestId());
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11a02942/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
index 52e2818..e5f3555 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
@@ -29,7 +29,7 @@ import org.apache.asterix.common.transactions.JobId;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
@@ -59,9 +59,9 @@ public class PrimaryIndexModificationOperationCallbackFactory extends AbstractOp
throws HyracksDataException {
ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
- IIndexLifecycleManager indexLifeCycleManager =
+ IResourceLifecycleManager indexLifeCycleManager =
txnSubsystem.getAsterixAppRuntimeContextProvider().getDatasetLifecycleManager();
- ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourcePath);
+ ILSMIndex index = (ILSMIndex) indexLifeCycleManager.get(resourcePath);
if (index == null) {
throw new HyracksDataException("Index(id:" + resourceId + ") is not registered.");
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11a02942/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
index c6743dd..f9f2b89 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
@@ -29,7 +29,7 @@ import org.apache.asterix.common.transactions.JobId;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
@@ -55,9 +55,9 @@ public class SecondaryIndexModificationOperationCallbackFactory extends Abstract
int resourcePartition, Object resource, IHyracksTaskContext ctx, IOperatorNodePushable operatorNodePushable)
throws HyracksDataException {
ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
- IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
+ IResourceLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
.getDatasetLifecycleManager();
- ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourcePath);
+ ILSMIndex index = (ILSMIndex) indexLifeCycleManager.get(resourcePath);
if (index == null) {
throw new HyracksDataException("Index(id:" + resourceId + ") is not registered.");
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11a02942/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
index 8c91c1a..df7d65d 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
@@ -29,7 +29,7 @@ import org.apache.asterix.common.transactions.JobId;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
@@ -53,9 +53,9 @@ public class TempDatasetPrimaryIndexModificationOperationCallbackFactory extends
int resourcePartition, Object resource, IHyracksTaskContext ctx, IOperatorNodePushable operatorNodePushable)
throws HyracksDataException {
ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
- IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
+ IResourceLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
.getDatasetLifecycleManager();
- ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourcePath);
+ ILSMIndex index = (ILSMIndex) indexLifeCycleManager.get(resourcePath);
if (index == null) {
throw new HyracksDataException("Index(id:" + resourceId + ") is not registered.");
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11a02942/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
index 3e11531..39e916b 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
@@ -29,7 +29,7 @@ import org.apache.asterix.common.transactions.JobId;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
@@ -53,9 +53,9 @@ public class TempDatasetSecondaryIndexModificationOperationCallbackFactory exten
int resourcePartition, Object resource, IHyracksTaskContext ctx, IOperatorNodePushable operatorNodePushable)
throws HyracksDataException {
ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
- IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
+ IResourceLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
.getDatasetLifecycleManager();
- ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourcePath);
+ ILSMIndex index = (ILSMIndex) indexLifeCycleManager.get(resourcePath);
if (index == null) {
throw new HyracksDataException("Index(id:" + resourceId + ") is not registered.");
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11a02942/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
index 5bf1505..85cdc89 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
@@ -28,7 +28,7 @@ import org.apache.asterix.common.transactions.JobId;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
@@ -55,9 +55,9 @@ public class UpsertOperationCallbackFactory extends AbstractOperationCallbackFac
throws HyracksDataException {
ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
- IIndexLifecycleManager indexLifeCycleManager =
+ IResourceLifecycleManager indexLifeCycleManager =
txnSubsystem.getAsterixAppRuntimeContextProvider().getDatasetLifecycleManager();
- ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourceName);
+ ILSMIndex index = (ILSMIndex) indexLifeCycleManager.get(resourceName);
if (index == null) {
throw new HyracksDataException("Index(id:" + resourceId + ") is not registered.");
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11a02942/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointThread.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointThread.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointThread.java
index c3cf3e0..851289e 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointThread.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointThread.java
@@ -18,21 +18,25 @@
*/
package org.apache.asterix.transaction.management.service.recovery;
+import java.io.IOError;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.transactions.ILogManager;
import org.apache.asterix.common.transactions.IRecoveryManager;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
public class CheckpointThread extends Thread {
+ private static final Logger LOGGER = Logger.getLogger(CheckpointThread.class.getName());
private long lsnThreshold;
private long checkpointTermInSecs;
private final ILogManager logManager;
private final IRecoveryManager recoveryMgr;
- public CheckpointThread(IRecoveryManager recoveryMgr, IIndexLifecycleManager indexLifecycleManager, ILogManager logManager,
+ public CheckpointThread(IRecoveryManager recoveryMgr, ILogManager logManager,
long lsnThreshold, long checkpointTermInSecs) {
this.recoveryMgr = recoveryMgr;
this.logManager = logManager;
@@ -45,24 +49,25 @@ public class CheckpointThread extends Thread {
Thread.currentThread().setName("Checkpoint Thread");
- long currentCheckpointAttemptMinLSN = -1;
+ long currentCheckpointAttemptMinLSN;
long lastCheckpointLSN = -1;
- long currentLogLSN = 0;
- long targetCheckpointLSN = 0;
+ long currentLogLSN;
+ long targetCheckpointLSN;
while (true) {
try {
sleep(checkpointTermInSecs * 1000);
} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
//ignore
}
-
- if(lastCheckpointLSN == -1)
- {
+ if (lastCheckpointLSN == -1) {
try {
- //Since the system just started up after sharp checkpoint, last checkpoint LSN is considered as the min LSN of the current log partition
+ //Since the system just started up after sharp checkpoint,
+ //last checkpoint LSN is considered as the min LSN of the current log partition
lastCheckpointLSN = logManager.getReadableSmallestLSN();
} catch (Exception e) {
+ LOGGER.log(Level.WARNING, "Error getting smallest readable LSN", e);
lastCheckpointLSN = 0;
}
}
@@ -82,13 +87,12 @@ public class CheckpointThread extends Thread {
currentCheckpointAttemptMinLSN = recoveryMgr.checkpoint(false, targetCheckpointLSN);
//checkpoint was completed at target LSN or above
- if(currentCheckpointAttemptMinLSN >= targetCheckpointLSN)
- {
+ if (currentCheckpointAttemptMinLSN >= targetCheckpointLSN) {
lastCheckpointLSN = currentCheckpointAttemptMinLSN;
}
} catch (ACIDException | HyracksDataException e) {
- throw new Error("failed to checkpoint", e);
+ throw new IOError(e);
}
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11a02942/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
index 0183b29..eb22c22 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -367,7 +367,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
String resourceAbsolutePath =
partitionIODevicePath + File.separator + localResource.getResourceName();
localResource.setResourcePath(resourceAbsolutePath);
- index = (ILSMIndex) datasetLifecycleManager.getIndex(resourceAbsolutePath);
+ index = (ILSMIndex) datasetLifecycleManager.get(resourceAbsolutePath);
if (index == null) {
//#. create index instance and register to indexLifeCycleManager
localResourceMetadata = (ILocalResourceMetadata) localResource.getResourceObject();
@@ -568,7 +568,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
public long getLocalMinFirstLSN() throws HyracksDataException {
IDatasetLifecycleManager datasetLifecycleManager =
txnSubsystem.getAsterixAppRuntimeContextProvider().getDatasetLifecycleManager();
- List<IIndex> openIndexList = datasetLifecycleManager.getOpenIndexes();
+ List<IIndex> openIndexList = datasetLifecycleManager.getOpenResources();
long firstLSN;
//the min first lsn can only be the current append or smaller
long minFirstLSN = logMgr.getAppendLSN();
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11a02942/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java
index f7ed355..6b25542 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java
@@ -81,8 +81,7 @@ public class TransactionSubsystem implements ITransactionSubsystem {
this.recoveryManager = new RecoveryManager(this);
if (asterixAppRuntimeContextProvider != null) {
- this.checkpointThread = new CheckpointThread(recoveryManager,
- asterixAppRuntimeContextProvider.getDatasetLifecycleManager(), logManager,
+ this.checkpointThread = new CheckpointThread(recoveryManager, logManager,
this.txnProperties.getCheckpointLSNThreshold(), this.txnProperties.getCheckpointPollFrequency());
this.checkpointThread.start();
} else {
@@ -95,22 +94,27 @@ public class TransactionSubsystem implements ITransactionSubsystem {
}
}
+ @Override
public ILogManager getLogManager() {
return logManager;
}
+ @Override
public ILockManager getLockManager() {
return lockManager;
}
+ @Override
public ITransactionManager getTransactionManager() {
return transactionManager;
}
+ @Override
public IRecoveryManager getRecoveryManager() {
return recoveryManager;
}
+ @Override
public IAsterixAppRuntimeContextProvider getAsterixAppRuntimeContextProvider() {
return asterixAppRuntimeContextProvider;
}
@@ -119,6 +123,7 @@ public class TransactionSubsystem implements ITransactionSubsystem {
return txnProperties;
}
+ @Override
public String getId() {
return id;
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11a02942/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java b/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java
index 3a4a56a..887532c 100644
--- a/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java
+++ b/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.transaction.management.service.locking;
+import static org.mockito.Mockito.mock;
+
import java.util.concurrent.Executors;
import org.apache.asterix.common.api.AsterixThreadExecutor;
@@ -31,9 +33,6 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.storage.common.file.IFileMapProvider;
import org.apache.hyracks.storage.common.file.ILocalResourceRepository;
-import org.apache.hyracks.storage.common.file.ResourceIdFactory;
-
-import static org.mockito.Mockito.mock;
class TestRuntimeContextProvider implements IAsterixAppRuntimeContextProvider {
@@ -86,11 +85,6 @@ class TestRuntimeContextProvider implements IAsterixAppRuntimeContextProvider {
}
@Override
- public ResourceIdFactory getResourceIdFactory() {
- throw new UnsupportedOperationException();
- }
-
- @Override
public IIOManager getIOManager() {
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11a02942/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/messages/IMessage.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/messages/IMessage.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/messages/IMessage.java
index ad19cf0..2f5a873 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/messages/IMessage.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/messages/IMessage.java
@@ -20,9 +20,6 @@ package org.apache.hyracks.api.messages;
import java.io.Serializable;
-/**
- * @author rico
- */
public interface IMessage extends Serializable {
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11a02942/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/messages/IMessageBroker.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/messages/IMessageBroker.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/messages/IMessageBroker.java
index 2b166d2..8aaa563 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/messages/IMessageBroker.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/messages/IMessageBroker.java
@@ -18,9 +18,6 @@
*/
package org.apache.hyracks.api.messages;
-/**
- * @author rico
- */
public interface IMessageBroker {
public void receivedMessage(IMessage message, String nodeId) throws Exception;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11a02942/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/IndexLifecycleManagerProvider.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/IndexLifecycleManagerProvider.java b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/IndexLifecycleManagerProvider.java
index 5228f80..960a11c 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/IndexLifecycleManagerProvider.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/IndexLifecycleManagerProvider.java
@@ -19,14 +19,14 @@
package org.apache.hyracks.examples.btree.helper;
import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
public enum IndexLifecycleManagerProvider implements IIndexLifecycleManagerProvider {
INSTANCE;
@Override
- public IIndexLifecycleManager getLifecycleManager(IHyracksTaskContext ctx) {
+ public IResourceLifecycleManager getLifecycleManager(IHyracksTaskContext ctx) {
return RuntimeContext.get(ctx).getIndexLifecycleManager();
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11a02942/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java
index 2e96527..a4909ad 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java
@@ -24,7 +24,7 @@ import java.util.concurrent.ThreadFactory;
import org.apache.hyracks.api.application.INCApplicationContext;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
import org.apache.hyracks.storage.am.common.dataflow.IndexLifecycleManager;
import org.apache.hyracks.storage.common.buffercache.BufferCache;
import org.apache.hyracks.storage.common.buffercache.ClockPageReplacementStrategy;
@@ -46,7 +46,7 @@ public class RuntimeContext {
private IBufferCache bufferCache;
private IFileMapManager fileMapManager;
private ILocalResourceRepository localResourceRepository;
- private IIndexLifecycleManager lcManager;
+ private IResourceLifecycleManager lcManager;
private ResourceIdFactory resourceIdFactory;
private ThreadFactory threadFactory = new ThreadFactory() {
public Thread newThread(Runnable r) {
@@ -90,7 +90,7 @@ public class RuntimeContext {
return resourceIdFactory;
}
- public IIndexLifecycleManager getIndexLifecycleManager() {
+ public IResourceLifecycleManager getIndexLifecycleManager() {
return lcManager;
}
}