You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by im...@apache.org on 2016/04/07 17:00:24 UTC
[49/50] [abbrv] incubator-asterixdb git commit: Merge remote-tracking
branch 'hyracks-local/master' into hyracks-merge2
Merge remote-tracking branch 'hyracks-local/master' into hyracks-merge2
Project: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/commit/e928b6ac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/tree/e928b6ac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/diff/e928b6ac
Branch: refs/heads/master
Commit: e928b6acd869c8bccd907c1979ef98ceb9a7c418
Parents: d630d1a 3f84996
Author: Ian Maxon <im...@apache.org>
Authored: Wed Apr 6 19:57:26 2016 -0700
Committer: Ian Maxon <im...@apache.org>
Committed: Wed Apr 6 19:57:26 2016 -0700
----------------------------------------------------------------------
.../control/nc/NodeControllerService.java | 9 ++++-
.../std/file/FileRemoveOperatorDescriptor.java | 26 ++++++++++---
.../lsm/btree/impls/ExternalBTreeOpContext.java | 5 ++-
.../impls/ExternalBTreeWithBuddyOpContext.java | 5 ++-
.../am/lsm/btree/impls/LSMBTreeOpContext.java | 5 ++-
.../storage/am/lsm/common/api/ILSMIndex.java | 6 +++
.../common/api/ILSMIndexOperationContext.java | 7 ++++
.../am/lsm/common/impls/AbstractLSMIndex.java | 25 ++++++------
.../lsm/common/impls/ExternalIndexHarness.java | 27 +++++++++----
.../storage/am/lsm/common/impls/LSMHarness.java | 20 +++++++++-
.../lsm/common/impls/LSMIndexSearchCursor.java | 4 +-
.../impls/LSMInvertedIndexOpContext.java | 5 ++-
.../lsm/rtree/impls/ExternalRTreeOpContext.java | 5 ++-
.../am/lsm/rtree/impls/LSMRTreeOpContext.java | 5 ++-
.../impls/AbstractLSMIndexOperationContext.java | 41 ++++++++++++++++++++
15 files changed, 155 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e928b6ac/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
----------------------------------------------------------------------
diff --cc hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 7b5758c,0000000..598d6db
mode 100644,000000..100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@@ -1,594 -1,0 +1,599 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc;
+
+import java.io.File;
+import java.lang.management.GarbageCollectorMXBean;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.lang.management.MemoryUsage;
+import java.lang.management.OperatingSystemMXBean;
+import java.lang.management.RuntimeMXBean;
+import java.lang.management.ThreadMXBean;
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+import java.util.StringTokenizer;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hyracks.api.application.INCApplicationEntryPoint;
+import org.apache.hyracks.api.client.NodeControllerInfo;
+import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.api.context.IHyracksRootContext;
+import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
+import org.apache.hyracks.api.deployment.DeploymentId;
+import org.apache.hyracks.api.io.IODeviceHandle;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
+import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.common.base.IClusterController;
+import org.apache.hyracks.control.common.context.ServerContext;
+import org.apache.hyracks.control.common.controllers.NCConfig;
+import org.apache.hyracks.control.common.controllers.NodeParameters;
+import org.apache.hyracks.control.common.controllers.NodeRegistration;
+import org.apache.hyracks.control.common.heartbeat.HeartbeatData;
+import org.apache.hyracks.control.common.heartbeat.HeartbeatSchema;
+import org.apache.hyracks.control.common.ipc.CCNCFunctions;
+import org.apache.hyracks.control.common.ipc.CCNCFunctions.StateDumpRequestFunction;
+import org.apache.hyracks.control.common.ipc.ClusterControllerRemoteProxy;
+import org.apache.hyracks.control.common.job.profiling.om.JobProfile;
+import org.apache.hyracks.control.common.work.FutureValue;
+import org.apache.hyracks.control.common.work.WorkQueue;
+import org.apache.hyracks.control.nc.application.NCApplicationContext;
+import org.apache.hyracks.control.nc.dataset.DatasetPartitionManager;
+import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.hyracks.control.nc.io.profiling.IIOCounter;
+import org.apache.hyracks.control.nc.io.profiling.IOCounterFactory;
+import org.apache.hyracks.control.nc.net.DatasetNetworkManager;
+import org.apache.hyracks.control.nc.net.NetworkManager;
+import org.apache.hyracks.control.nc.partitions.PartitionManager;
+import org.apache.hyracks.control.nc.resources.memory.MemoryManager;
+import org.apache.hyracks.control.nc.runtime.RootHyracksContext;
+import org.apache.hyracks.control.nc.work.AbortTasksWork;
+import org.apache.hyracks.control.nc.work.ApplicationMessageWork;
+import org.apache.hyracks.control.nc.work.BuildJobProfilesWork;
+import org.apache.hyracks.control.nc.work.CleanupJobletWork;
+import org.apache.hyracks.control.nc.work.DeployBinaryWork;
+import org.apache.hyracks.control.nc.work.ReportPartitionAvailabilityWork;
+import org.apache.hyracks.control.nc.work.ShutdownWork;
+import org.apache.hyracks.control.nc.work.StartTasksWork;
+import org.apache.hyracks.control.nc.work.StateDumpWork;
+import org.apache.hyracks.control.nc.work.UnDeployBinaryWork;
+import org.apache.hyracks.ipc.api.IIPCHandle;
+import org.apache.hyracks.ipc.api.IIPCI;
+import org.apache.hyracks.ipc.api.IPCPerformanceCounters;
+import org.apache.hyracks.ipc.impl.IPCSystem;
+import org.apache.hyracks.net.protocols.muxdemux.MuxDemuxPerformanceCounters;
+
+public class NodeControllerService implements IControllerService {
+ private static Logger LOGGER = Logger.getLogger(NodeControllerService.class.getName());
+
+ private static final double MEMORY_FUDGE_FACTOR = 0.8;
+
+ private NCConfig ncConfig;
+
+ private final String id;
+
+ private final IHyracksRootContext ctx;
+
+ private final IPCSystem ipc;
+
+ private final PartitionManager partitionManager;
+
+ private final NetworkManager netManager;
+
+ private IDatasetPartitionManager datasetPartitionManager;
+
+ private DatasetNetworkManager datasetNetworkManager;
+
+ private final WorkQueue queue;
+
+ private final Timer timer;
+
+ private boolean registrationPending;
+
+ private Exception registrationException;
+
+ private IClusterController ccs;
+
+ private final Map<JobId, Joblet> jobletMap;
+
+ private ExecutorService executor;
+
+ private NodeParameters nodeParameters;
+
+ private HeartbeatTask heartbeatTask;
+
+ private final ServerContext serverCtx;
+
+ private NCApplicationContext appCtx;
+
+ private INCApplicationEntryPoint ncAppEntryPoint;
+
+ private final ILifeCycleComponentManager lccm;
+
+ private final MemoryMXBean memoryMXBean;
+
+ private final List<GarbageCollectorMXBean> gcMXBeans;
+
+ private final ThreadMXBean threadMXBean;
+
+ private final RuntimeMXBean runtimeMXBean;
+
+ private final OperatingSystemMXBean osMXBean;
+
+ private final Mutable<FutureValue<Map<String, NodeControllerInfo>>> getNodeControllerInfosAcceptor;
+
+ private final MemoryManager memoryManager;
+
+ private boolean shuttedDown = false;
+
+ private IIOCounter ioCounter;
+
+ public NodeControllerService(NCConfig ncConfig) throws Exception {
+ this.ncConfig = ncConfig;
+ id = ncConfig.nodeId;
+ NodeControllerIPCI ipci = new NodeControllerIPCI();
+ ipc = new IPCSystem(new InetSocketAddress(ncConfig.clusterNetIPAddress, ncConfig.clusterNetPort), ipci,
+ new CCNCFunctions.SerializerDeserializer());
+
+ this.ctx = new RootHyracksContext(this, new IOManager(getDevices(ncConfig.ioDevices)));
+ if (id == null) {
+ throw new Exception("id not set");
+ }
+ partitionManager = new PartitionManager(this);
+ netManager = new NetworkManager(ncConfig.dataIPAddress, ncConfig.dataPort, partitionManager,
+ ncConfig.nNetThreads, ncConfig.nNetBuffers, ncConfig.dataPublicIPAddress, ncConfig.dataPublicPort);
+
+ lccm = new LifeCycleComponentManager();
+ queue = new WorkQueue(Thread.NORM_PRIORITY); // Reserves MAX_PRIORITY of the heartbeat thread.
+ jobletMap = new Hashtable<JobId, Joblet>();
+ timer = new Timer(true);
+ serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER,
+ new File(new File(NodeControllerService.class.getName()), id));
+ memoryMXBean = ManagementFactory.getMemoryMXBean();
+ gcMXBeans = ManagementFactory.getGarbageCollectorMXBeans();
+ threadMXBean = ManagementFactory.getThreadMXBean();
+ runtimeMXBean = ManagementFactory.getRuntimeMXBean();
+ osMXBean = ManagementFactory.getOperatingSystemMXBean();
+ registrationPending = true;
+ getNodeControllerInfosAcceptor = new MutableObject<FutureValue<Map<String, NodeControllerInfo>>>();
+ memoryManager = new MemoryManager((long) (memoryMXBean.getHeapMemoryUsage().getMax() * MEMORY_FUDGE_FACTOR));
+ ioCounter = new IOCounterFactory().getIOCounter();
+ }
+
+ public IHyracksRootContext getRootContext() {
+ return ctx;
+ }
+
+ public NCApplicationContext getApplicationContext() {
+ return appCtx;
+ }
+
+ public ILifeCycleComponentManager getLifeCycleComponentManager() {
+ return lccm;
+ }
+
+ private static List<IODeviceHandle> getDevices(String ioDevices) {
+ List<IODeviceHandle> devices = new ArrayList<IODeviceHandle>();
+ StringTokenizer tok = new StringTokenizer(ioDevices, ",");
+ while (tok.hasMoreElements()) {
+ String devPath = tok.nextToken().trim();
+ devices.add(new IODeviceHandle(new File(devPath), "."));
+ }
+ return devices;
+ }
+
+ private synchronized void setNodeRegistrationResult(NodeParameters parameters, Exception exception) {
+ this.nodeParameters = parameters;
+ this.registrationException = exception;
+ this.registrationPending = false;
+ notifyAll();
+ }
+
+ public Map<String, NodeControllerInfo> getNodeControllersInfo() throws Exception {
+ FutureValue<Map<String, NodeControllerInfo>> fv = new FutureValue<Map<String, NodeControllerInfo>>();
+ synchronized (getNodeControllerInfosAcceptor) {
+ while (getNodeControllerInfosAcceptor.getValue() != null) {
+ getNodeControllerInfosAcceptor.wait();
+ }
+ getNodeControllerInfosAcceptor.setValue(fv);
+ }
+ ccs.getNodeControllerInfos();
+ return fv.get();
+ }
+
+ private void setNodeControllersInfo(Map<String, NodeControllerInfo> ncInfos) {
+ FutureValue<Map<String, NodeControllerInfo>> fv;
+ synchronized (getNodeControllerInfosAcceptor) {
+ fv = getNodeControllerInfosAcceptor.getValue();
+ getNodeControllerInfosAcceptor.setValue(null);
+ getNodeControllerInfosAcceptor.notifyAll();
+ }
+ fv.setValue(ncInfos);
+ }
+
+ private void init() throws Exception {
+ ctx.getIOManager().setExecutor(executor);
+ datasetPartitionManager = new DatasetPartitionManager(this, executor, ncConfig.resultManagerMemory,
+ ncConfig.resultTTL, ncConfig.resultSweepThreshold);
+ datasetNetworkManager = new DatasetNetworkManager(ncConfig.resultIPAddress, ncConfig.resultPort,
+ datasetPartitionManager, ncConfig.nNetThreads, ncConfig.nNetBuffers, ncConfig.resultPublicIPAddress,
+ ncConfig.resultPublicPort);
+ }
+
+ @Override
+ public void start() throws Exception {
+ LOGGER.log(Level.INFO, "Starting NodeControllerService");
+ ipc.start();
+ netManager.start();
+
+ startApplication();
+ init();
+
+ datasetNetworkManager.start();
+ IIPCHandle ccIPCHandle = ipc.getHandle(new InetSocketAddress(ncConfig.ccHost, ncConfig.ccPort), -1);
+ this.ccs = new ClusterControllerRemoteProxy(ccIPCHandle);
+ HeartbeatSchema.GarbageCollectorInfo[] gcInfos = new HeartbeatSchema.GarbageCollectorInfo[gcMXBeans.size()];
+ for (int i = 0; i < gcInfos.length; ++i) {
+ gcInfos[i] = new HeartbeatSchema.GarbageCollectorInfo(gcMXBeans.get(i).getName());
+ }
+ HeartbeatSchema hbSchema = new HeartbeatSchema(gcInfos);
+ // Use "public" versions of network addresses and ports
+ NetworkAddress datasetAddress = datasetNetworkManager.getPublicNetworkAddress();
+ NetworkAddress netAddress = netManager.getPublicNetworkAddress();
+ if (ncConfig.dataPublicIPAddress != null) {
+ netAddress = new NetworkAddress(ncConfig.dataPublicIPAddress, ncConfig.dataPublicPort);
+ }
+ ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id, ncConfig, netAddress, datasetAddress,
+ osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), osMXBean.getAvailableProcessors(),
+ runtimeMXBean.getVmName(), runtimeMXBean.getVmVersion(), runtimeMXBean.getVmVendor(),
+ runtimeMXBean.getClassPath(), runtimeMXBean.getLibraryPath(), runtimeMXBean.getBootClassPath(),
+ runtimeMXBean.getInputArguments(), runtimeMXBean.getSystemProperties(), hbSchema));
+
+ synchronized (this) {
+ while (registrationPending) {
+ wait();
+ }
+ }
+ if (registrationException != null) {
+ throw registrationException;
+ }
+ appCtx.setDistributedState(nodeParameters.getDistributedState());
+
+ queue.start();
+
+ heartbeatTask = new HeartbeatTask(ccs);
+
+ // Use reflection to set the priority of the timer thread.
+ Field threadField = timer.getClass().getDeclaredField("thread");
+ threadField.setAccessible(true);
+ Thread timerThread = (Thread) threadField.get(timer); // The internal timer thread of the Timer object.
+ timerThread.setPriority(Thread.MAX_PRIORITY);
+ // Schedule heartbeat generator.
+ timer.schedule(heartbeatTask, 0, nodeParameters.getHeartbeatPeriod());
+
+ if (nodeParameters.getProfileDumpPeriod() > 0) {
+ // Schedule profile dump generator.
+ timer.schedule(new ProfileDumpTask(ccs), 0, nodeParameters.getProfileDumpPeriod());
+ }
+
+ LOGGER.log(Level.INFO, "Started NodeControllerService");
+ if (ncAppEntryPoint != null) {
+ ncAppEntryPoint.notifyStartupComplete();
+ }
+
+ //add JVM shutdown hook
+ Runtime.getRuntime().addShutdownHook(new JVMShutdownHook(this));
+ }
+
+ private void startApplication() throws Exception {
+ appCtx = new NCApplicationContext(this, serverCtx, ctx, id, memoryManager, lccm);
+ String className = ncConfig.appNCMainClass;
+ if (className != null) {
+ Class<?> c = Class.forName(className);
+ ncAppEntryPoint = (INCApplicationEntryPoint) c.newInstance();
+ String[] args = ncConfig.appArgs == null ? new String[0]
+ : ncConfig.appArgs.toArray(new String[ncConfig.appArgs.size()]);
+ ncAppEntryPoint.start(appCtx, args);
+ }
+ executor = Executors.newCachedThreadPool(appCtx.getThreadFactory());
+ }
+
+ @Override
+ public synchronized void stop() throws Exception {
+ if (!shuttedDown) {
+ LOGGER.log(Level.INFO, "Stopping NodeControllerService");
+ executor.shutdownNow();
+ if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
+ LOGGER.log(Level.SEVERE, "Some jobs failed to exit, continuing shutdown abnormally");
+ }
+ partitionManager.close();
+ datasetPartitionManager.close();
- heartbeatTask.cancel();
+ netManager.stop();
+ datasetNetworkManager.stop();
+ queue.stop();
- if (ncAppEntryPoint != null)
++ if (ncAppEntryPoint != null) {
+ ncAppEntryPoint.stop();
++ }
++ /**
++ * Stop heartbeat after NC has stopped to avoid false node failure detection
++ * on CC if an NC takes a long time to stop.
++ */
++ heartbeatTask.cancel();
+ LOGGER.log(Level.INFO, "Stopped NodeControllerService");
+ shuttedDown = true;
+ }
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public ServerContext getServerContext() {
+ return serverCtx;
+ }
+
+ public Map<JobId, Joblet> getJobletMap() {
+ return jobletMap;
+ }
+
+ public NetworkManager getNetworkManager() {
+ return netManager;
+ }
+
+ public DatasetNetworkManager getDatasetNetworkManager() {
+ return datasetNetworkManager;
+ }
+
+ public PartitionManager getPartitionManager() {
+ return partitionManager;
+ }
+
+ public IClusterController getClusterController() {
+ return ccs;
+ }
+
+ public NodeParameters getNodeParameters() {
+ return nodeParameters;
+ }
+
+ public ExecutorService getExecutorService() {
+ return executor;
+ }
+
+ public NCConfig getConfiguration() {
+ return ncConfig;
+ }
+
+ public WorkQueue getWorkQueue() {
+ return queue;
+ }
+
+ private class HeartbeatTask extends TimerTask {
+ private IClusterController cc;
+
+ private final HeartbeatData hbData;
+
+ public HeartbeatTask(IClusterController cc) {
+ this.cc = cc;
+ hbData = new HeartbeatData();
+ hbData.gcCollectionCounts = new long[gcMXBeans.size()];
+ hbData.gcCollectionTimes = new long[gcMXBeans.size()];
+ }
+
+ @Override
+ public void run() {
+ MemoryUsage heapUsage = memoryMXBean.getHeapMemoryUsage();
+ hbData.heapInitSize = heapUsage.getInit();
+ hbData.heapUsedSize = heapUsage.getUsed();
+ hbData.heapCommittedSize = heapUsage.getCommitted();
+ hbData.heapMaxSize = heapUsage.getMax();
+ MemoryUsage nonheapUsage = memoryMXBean.getNonHeapMemoryUsage();
+ hbData.nonheapInitSize = nonheapUsage.getInit();
+ hbData.nonheapUsedSize = nonheapUsage.getUsed();
+ hbData.nonheapCommittedSize = nonheapUsage.getCommitted();
+ hbData.nonheapMaxSize = nonheapUsage.getMax();
+ hbData.threadCount = threadMXBean.getThreadCount();
+ hbData.peakThreadCount = threadMXBean.getPeakThreadCount();
+ hbData.totalStartedThreadCount = threadMXBean.getTotalStartedThreadCount();
+ hbData.systemLoadAverage = osMXBean.getSystemLoadAverage();
+ int gcN = gcMXBeans.size();
+ for (int i = 0; i < gcN; ++i) {
+ GarbageCollectorMXBean gcMXBean = gcMXBeans.get(i);
+ hbData.gcCollectionCounts[i] = gcMXBean.getCollectionCount();
+ hbData.gcCollectionTimes[i] = gcMXBean.getCollectionTime();
+ }
+
+ MuxDemuxPerformanceCounters netPC = netManager.getPerformanceCounters();
+ hbData.netPayloadBytesRead = netPC.getPayloadBytesRead();
+ hbData.netPayloadBytesWritten = netPC.getPayloadBytesWritten();
+ hbData.netSignalingBytesRead = netPC.getSignalingBytesRead();
+ hbData.netSignalingBytesWritten = netPC.getSignalingBytesWritten();
+
+ MuxDemuxPerformanceCounters datasetNetPC = datasetNetworkManager.getPerformanceCounters();
+ hbData.datasetNetPayloadBytesRead = datasetNetPC.getPayloadBytesRead();
+ hbData.datasetNetPayloadBytesWritten = datasetNetPC.getPayloadBytesWritten();
+ hbData.datasetNetSignalingBytesRead = datasetNetPC.getSignalingBytesRead();
+ hbData.datasetNetSignalingBytesWritten = datasetNetPC.getSignalingBytesWritten();
+
+ IPCPerformanceCounters ipcPC = ipc.getPerformanceCounters();
+ hbData.ipcMessagesSent = ipcPC.getMessageSentCount();
+ hbData.ipcMessageBytesSent = ipcPC.getMessageBytesSent();
+ hbData.ipcMessagesReceived = ipcPC.getMessageReceivedCount();
+ hbData.ipcMessageBytesReceived = ipcPC.getMessageBytesReceived();
+
+ hbData.diskReads = ioCounter.getReads();
+ hbData.diskWrites = ioCounter.getWrites();
+
+ try {
+ cc.nodeHeartbeat(id, hbData);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private class ProfileDumpTask extends TimerTask {
+ private IClusterController cc;
+
+ public ProfileDumpTask(IClusterController cc) {
+ this.cc = cc;
+ }
+
+ @Override
+ public void run() {
+ try {
+ FutureValue<List<JobProfile>> fv = new FutureValue<List<JobProfile>>();
+ BuildJobProfilesWork bjpw = new BuildJobProfilesWork(NodeControllerService.this, fv);
+ queue.scheduleAndSync(bjpw);
+ List<JobProfile> profiles = fv.get();
+ if (!profiles.isEmpty()) {
+ cc.reportProfile(id, profiles);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private final class NodeControllerIPCI implements IIPCI {
+ @Override
+ public void deliverIncomingMessage(final IIPCHandle handle, long mid, long rmid, Object payload,
+ Exception exception) {
+ CCNCFunctions.Function fn = (CCNCFunctions.Function) payload;
+ switch (fn.getFunctionId()) {
+ case SEND_APPLICATION_MESSAGE: {
+ CCNCFunctions.SendApplicationMessageFunction amf = (CCNCFunctions.SendApplicationMessageFunction) fn;
+ queue.schedule(new ApplicationMessageWork(NodeControllerService.this, amf.getMessage(),
+ amf.getDeploymentId(), amf.getNodeId()));
+ return;
+ }
+ case START_TASKS: {
+ CCNCFunctions.StartTasksFunction stf = (CCNCFunctions.StartTasksFunction) fn;
+ queue.schedule(new StartTasksWork(NodeControllerService.this, stf.getDeploymentId(), stf.getJobId(),
+ stf.getPlanBytes(), stf.getTaskDescriptors(), stf.getConnectorPolicies(), stf.getFlags()));
+ return;
+ }
+
+ case ABORT_TASKS: {
+ CCNCFunctions.AbortTasksFunction atf = (CCNCFunctions.AbortTasksFunction) fn;
+ queue.schedule(new AbortTasksWork(NodeControllerService.this, atf.getJobId(), atf.getTasks()));
+ return;
+ }
+
+ case CLEANUP_JOBLET: {
+ CCNCFunctions.CleanupJobletFunction cjf = (CCNCFunctions.CleanupJobletFunction) fn;
+ queue.schedule(new CleanupJobletWork(NodeControllerService.this, cjf.getJobId(), cjf.getStatus()));
+ return;
+ }
+
+ case REPORT_PARTITION_AVAILABILITY: {
+ CCNCFunctions.ReportPartitionAvailabilityFunction rpaf = (CCNCFunctions.ReportPartitionAvailabilityFunction) fn;
+ queue.schedule(new ReportPartitionAvailabilityWork(NodeControllerService.this,
+ rpaf.getPartitionId(), rpaf.getNetworkAddress()));
+ return;
+ }
+
+ case NODE_REGISTRATION_RESULT: {
+ CCNCFunctions.NodeRegistrationResult nrrf = (CCNCFunctions.NodeRegistrationResult) fn;
+ setNodeRegistrationResult(nrrf.getNodeParameters(), nrrf.getException());
+ return;
+ }
+
+ case GET_NODE_CONTROLLERS_INFO_RESPONSE: {
+ CCNCFunctions.GetNodeControllersInfoResponseFunction gncirf = (CCNCFunctions.GetNodeControllersInfoResponseFunction) fn;
+ setNodeControllersInfo(gncirf.getNodeControllerInfos());
+ return;
+ }
+
+ case DEPLOY_BINARY: {
+ CCNCFunctions.DeployBinaryFunction ndbf = (CCNCFunctions.DeployBinaryFunction) fn;
+ queue.schedule(new DeployBinaryWork(NodeControllerService.this, ndbf.getDeploymentId(),
+ ndbf.getBinaryURLs()));
+ return;
+ }
+
+ case UNDEPLOY_BINARY: {
+ CCNCFunctions.UnDeployBinaryFunction ndbf = (CCNCFunctions.UnDeployBinaryFunction) fn;
+ queue.schedule(new UnDeployBinaryWork(NodeControllerService.this, ndbf.getDeploymentId()));
+ return;
+ }
+
+ case STATE_DUMP_REQUEST: {
+ final CCNCFunctions.StateDumpRequestFunction dsrf = (StateDumpRequestFunction) fn;
+ queue.schedule(new StateDumpWork(NodeControllerService.this, dsrf.getStateDumpId()));
+ return;
+ }
+ case SHUTDOWN_REQUEST: {
+ queue.schedule(new ShutdownWork(NodeControllerService.this));
+ return;
+ }
+ }
+ throw new IllegalArgumentException("Unknown function: " + fn.getFunctionId());
+
+ }
+ }
+
+ public void sendApplicationMessageToCC(byte[] data, DeploymentId deploymentId) throws Exception {
+ ccs.sendApplicationMessageToCC(data, deploymentId, id);
+ }
+
+ public IDatasetPartitionManager getDatasetPartitionManager() {
+ return datasetPartitionManager;
+ }
+
+ /**
+ * Shutdown hook that invokes {@link NCApplicationEntryPoint#stop() stop} method.
+ */
+ private static class JVMShutdownHook extends Thread {
+
+ private final NodeControllerService nodeControllerService;
+
+ public JVMShutdownHook(NodeControllerService ncAppEntryPoint) {
+ this.nodeControllerService = ncAppEntryPoint;
+ }
+
+ @Override
+ public void run() {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Shutdown hook in progress");
+ }
+ try {
+ nodeControllerService.stop();
+ } catch (Exception e) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Exception in executing shutdown hook" + e);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e928b6ac/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FileRemoveOperatorDescriptor.java
----------------------------------------------------------------------
diff --cc hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FileRemoveOperatorDescriptor.java
index cf2008c,0000000..c3883e8
mode 100644,000000..100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FileRemoveOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FileRemoveOperatorDescriptor.java
@@@ -1,88 -1,0 +1,104 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.file;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.dataflow.std.base.AbstractOperatorNodePushable;
+import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+
+public class FileRemoveOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+
+ private final IFileSplitProvider fileSplitProvider;
++ private final boolean quietly;
+
- public FileRemoveOperatorDescriptor(IOperatorDescriptorRegistry spec, IFileSplitProvider fileSplitProvder) {
++ public FileRemoveOperatorDescriptor(IOperatorDescriptorRegistry spec, IFileSplitProvider fileSplitProvder,
++ boolean quietly) {
+ super(spec, 0, 0);
+ this.fileSplitProvider = fileSplitProvder;
++ this.quietly = quietly;
++ }
++
++ /**
++ *
++ * @deprecated use {@link #FileRemoveOperatorDescriptor(IOperatorDescriptorRegistry spec, IFileSplitProvider fileSplitProvder, boolean quietly)} instead.
++ */
++ @Deprecated
++ public FileRemoveOperatorDescriptor(IOperatorDescriptorRegistry spec, IFileSplitProvider fileSplitProvder) {
++ this(spec, fileSplitProvder, false);
+ }
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+ final FileSplit split = fileSplitProvider.getFileSplits()[partition];
+ final String path = split.getLocalFile().getFile().getPath();
+ final int deviceId = split.getIODeviceId();
+ final IIOManager ioManager = ctx.getIOManager();
+ return new AbstractOperatorNodePushable() {
+
+ @Override
+ public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public void initialize() throws HyracksDataException {
+ File f = ioManager.getAbsoluteFileRef(deviceId, path).getFile();
- try {
- FileUtils.deleteDirectory(f);
- } catch (IOException e) {
- throw new HyracksDataException(e);
++ if (quietly) {
++ FileUtils.deleteQuietly(f);
++ } else {
++ try {
++ FileUtils.deleteDirectory(f);
++ } catch (IOException e) {
++ throw new HyracksDataException(e);
++ }
+ }
+ }
+
+ @Override
+ public IFrameWriter getInputFrameWriter(int index) {
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public int getInputArity() {
+ return 0;
+ }
+
+ @Override
+ public void deinitialize() throws HyracksDataException {
+ }
+ };
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e928b6ac/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeOpContext.java
----------------------------------------------------------------------
diff --cc hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeOpContext.java
index d63671e,0000000..29fedef
mode 100644,000000..100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeOpContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeOpContext.java
@@@ -1,145 -1,0 +1,146 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.impls;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.storage.am.btree.api.IBTreeLeafFrame;
+import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
+import org.apache.hyracks.storage.am.common.api.ISearchOperationCallback;
+import org.apache.hyracks.storage.am.common.api.ISearchPredicate;
+import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
+import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.common.ophelpers.MultiComparator;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
- import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
++import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexOperationContext;
+
- public class ExternalBTreeOpContext implements ILSMIndexOperationContext {
++public class ExternalBTreeOpContext extends AbstractLSMIndexOperationContext {
+ public ITreeIndexFrameFactory insertLeafFrameFactory;
+ public ITreeIndexFrameFactory deleteLeafFrameFactory;
+ public IBTreeLeafFrame insertLeafFrame;
+ public IBTreeLeafFrame deleteLeafFrame;
+ public IndexOperation op;
+ public final MultiComparator cmp;
+ public final MultiComparator bloomFilterCmp;
+ public final ISearchOperationCallback searchCallback;
+ private final List<ILSMComponent> componentHolder;
+ private final List<ILSMComponent> componentsToBeMerged;
+ private final List<ILSMComponent> componentsToBeReplicated;
+ private final int targetIndexVersion;
+ public ISearchPredicate searchPredicate;
+ public LSMBTreeCursorInitialState searchInitialState;
+
+ public ExternalBTreeOpContext(ITreeIndexFrameFactory insertLeafFrameFactory,
+ ITreeIndexFrameFactory deleteLeafFrameFactory, ISearchOperationCallback searchCallback,
+ int numBloomFilterKeyFields, IBinaryComparatorFactory[] cmpFactories, int targetIndexVersion,
+ ILSMHarness lsmHarness) {
+ if (cmpFactories != null) {
+ this.cmp = MultiComparator.create(cmpFactories);
+ } else {
+ this.cmp = null;
+ }
+ bloomFilterCmp = MultiComparator.create(cmpFactories, 0, numBloomFilterKeyFields);
+ this.insertLeafFrameFactory = insertLeafFrameFactory;
+ this.deleteLeafFrameFactory = deleteLeafFrameFactory;
+ this.insertLeafFrame = (IBTreeLeafFrame) insertLeafFrameFactory.createFrame();
+ this.deleteLeafFrame = (IBTreeLeafFrame) deleteLeafFrameFactory.createFrame();
+ if (insertLeafFrame != null && this.cmp != null) {
+ insertLeafFrame.setMultiComparator(cmp);
+ }
+ if (deleteLeafFrame != null && this.cmp != null) {
+ deleteLeafFrame.setMultiComparator(cmp);
+ }
+ this.componentHolder = new LinkedList<ILSMComponent>();
+ this.componentsToBeMerged = new LinkedList<ILSMComponent>();
+ this.componentsToBeReplicated = new LinkedList<ILSMComponent>();
+ this.searchCallback = searchCallback;
+ this.targetIndexVersion = targetIndexVersion;
+ searchInitialState = new LSMBTreeCursorInitialState(insertLeafFrameFactory, cmp, bloomFilterCmp, lsmHarness,
+ null, searchCallback, null);
+ }
+
+ @Override
+ public void setOperation(IndexOperation newOp) {
+ reset();
+ this.op = newOp;
+ }
+
+ @Override
+ public void reset() {
++ super.reset();
+ componentHolder.clear();
+ componentsToBeMerged.clear();
+ componentsToBeReplicated.clear();
+ }
+
+ @Override
+ public IndexOperation getOperation() {
+ return op;
+ }
+
+ @Override
+ public List<ILSMComponent> getComponentHolder() {
+ return componentHolder;
+ }
+
+ @Override
+ public ISearchOperationCallback getSearchOperationCallback() {
+ return searchCallback;
+ }
+
+ // Disk only index should never needs a modification callback
+ @Override
+ public IModificationOperationCallback getModificationCallback() {
+ return null;
+ }
+
+ @Override
+ public void setCurrentMutableComponentId(int currentMutableComponentId) {
+ // Do nothing: this method should never be called for this class
+ }
+
+ @Override
+ public List<ILSMComponent> getComponentsToBeMerged() {
+ return componentsToBeMerged;
+ }
+
+ // Used by indexes with global transaction
+ public int getTargetIndexVersion() {
+ return targetIndexVersion;
+ }
+
+ @Override
+ public void setSearchPredicate(ISearchPredicate searchPredicate) {
+ this.searchPredicate = searchPredicate;
+ }
+
+ @Override
+ public ISearchPredicate getSearchPredicate() {
+ return searchPredicate;
+ }
+
+ @Override
+ public List<ILSMComponent> getComponentsToBeReplicated() {
+ return componentsToBeReplicated;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e928b6ac/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddyOpContext.java
----------------------------------------------------------------------
diff --cc hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddyOpContext.java
index a837301,0000000..c44f529
mode 100644,000000..100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddyOpContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddyOpContext.java
@@@ -1,134 -1,0 +1,135 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.impls;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
+import org.apache.hyracks.storage.am.common.api.ISearchOperationCallback;
+import org.apache.hyracks.storage.am.common.api.ISearchPredicate;
+import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
+import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.common.ophelpers.MultiComparator;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
- import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
++import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexOperationContext;
+
- public class ExternalBTreeWithBuddyOpContext implements ILSMIndexOperationContext {
++public class ExternalBTreeWithBuddyOpContext extends AbstractLSMIndexOperationContext {
+ private IndexOperation op;
+ private MultiComparator bTreeCmp;
+ private MultiComparator buddyBTreeCmp;
+ public final List<ILSMComponent> componentHolder;
+ private final List<ILSMComponent> componentsToBeMerged;
+ private final List<ILSMComponent> componentsToBeReplicated;
+ public final ISearchOperationCallback searchCallback;
+ private final int targetIndexVersion;
+ public ISearchPredicate searchPredicate;
+ public LSMBTreeWithBuddyCursorInitialState searchInitialState;
+
+ public ExternalBTreeWithBuddyOpContext(IBinaryComparatorFactory[] btreeCmpFactories,
+ IBinaryComparatorFactory[] buddyBtreeCmpFactories, ISearchOperationCallback searchCallback,
+ int targetIndexVersion, ILSMHarness lsmHarness, ITreeIndexFrameFactory btreeInteriorFrameFactory,
+ ITreeIndexFrameFactory btreeLeafFrameFactory, ITreeIndexFrameFactory buddyBtreeLeafFrameFactory) {
+ this.componentHolder = new LinkedList<ILSMComponent>();
+ this.componentsToBeMerged = new LinkedList<ILSMComponent>();
+ this.componentsToBeReplicated = new LinkedList<ILSMComponent>();
+ this.searchCallback = searchCallback;
+ this.targetIndexVersion = targetIndexVersion;
+ this.bTreeCmp = MultiComparator.create(btreeCmpFactories);
+ this.buddyBTreeCmp = MultiComparator.create(buddyBtreeCmpFactories);
+ searchInitialState = new LSMBTreeWithBuddyCursorInitialState(btreeInteriorFrameFactory, btreeLeafFrameFactory,
+ buddyBtreeLeafFrameFactory, lsmHarness, MultiComparator.create(btreeCmpFactories),
+ MultiComparator.create(buddyBtreeCmpFactories), NoOpOperationCallback.INSTANCE, null);
+ }
+
+ @Override
+ public void setOperation(IndexOperation newOp) {
+ reset();
+ this.op = newOp;
+ }
+
+ @Override
+ public void setCurrentMutableComponentId(int currentMutableComponentId) {
+ // Do nothing. this should never be called for disk only indexes
+ }
+
+ @Override
+ public void reset() {
++ super.reset();
+ componentHolder.clear();
+ componentsToBeMerged.clear();
+ componentsToBeReplicated.clear();
+ }
+
+ @Override
+ public IndexOperation getOperation() {
+ return op;
+ }
+
+ public MultiComparator getBTreeMultiComparator() {
+ return bTreeCmp;
+ }
+
+ public MultiComparator getBuddyBTreeMultiComparator() {
+ return buddyBTreeCmp;
+ }
+
+ @Override
+ public List<ILSMComponent> getComponentHolder() {
+ return componentHolder;
+ }
+
+ @Override
+ public ISearchOperationCallback getSearchOperationCallback() {
+ return searchCallback;
+ }
+
+ // This should never be needed for disk only indexes
+ @Override
+ public IModificationOperationCallback getModificationCallback() {
+ return null;
+ }
+
+ @Override
+ public List<ILSMComponent> getComponentsToBeMerged() {
+ return componentsToBeMerged;
+ }
+
+ public int getTargetIndexVersion() {
+ return targetIndexVersion;
+ }
+
+ @Override
+ public void setSearchPredicate(ISearchPredicate searchPredicate) {
+ this.searchPredicate = searchPredicate;
+ }
+
+ @Override
+ public ISearchPredicate getSearchPredicate() {
+ return searchPredicate;
+ }
+
+ @Override
+ public List<ILSMComponent> getComponentsToBeReplicated() {
+ return componentsToBeReplicated;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e928b6ac/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java
----------------------------------------------------------------------
diff --cc hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java
index e833283,0000000..31c9d40
mode 100644,000000..100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java
@@@ -1,218 -1,0 +1,219 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.storage.am.lsm.btree.impls;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.storage.am.btree.api.IBTreeLeafFrame;
+import org.apache.hyracks.storage.am.btree.impls.BTree;
+import org.apache.hyracks.storage.am.btree.impls.BTreeOpContext;
+import org.apache.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
+import org.apache.hyracks.storage.am.btree.impls.RangePredicate;
+import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
+import org.apache.hyracks.storage.am.common.api.ISearchOperationCallback;
+import org.apache.hyracks.storage.am.common.api.ISearchPredicate;
+import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
+import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.common.ophelpers.MultiComparator;
+import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
- import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
++import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexOperationContext;
+
- public final class LSMBTreeOpContext implements ILSMIndexOperationContext {
++public final class LSMBTreeOpContext extends AbstractLSMIndexOperationContext {
+
+ public ITreeIndexFrameFactory insertLeafFrameFactory;
+ public ITreeIndexFrameFactory deleteLeafFrameFactory;
+ public IBTreeLeafFrame insertLeafFrame;
+ public IBTreeLeafFrame deleteLeafFrame;
+ public final BTree[] mutableBTrees;
+ public BTree.BTreeAccessor[] mutableBTreeAccessors;
+ public BTreeOpContext[] mutableBTreeOpCtxs;
+ public BTree.BTreeAccessor currentMutableBTreeAccessor;
+ public BTreeOpContext currentMutableBTreeOpCtx;
+ public IndexOperation op;
+ public final MultiComparator cmp;
+ public final MultiComparator bloomFilterCmp;
+ public IModificationOperationCallback modificationCallback;
+ public ISearchOperationCallback searchCallback;
+ private final List<ILSMComponent> componentHolder;
+ private final List<ILSMComponent> componentsToBeMerged;
+ private final List<ILSMComponent> componentsToBeReplicated;
+ public final PermutingTupleReference indexTuple;
+ public final MultiComparator filterCmp;
+ public final PermutingTupleReference filterTuple;
+ public ISearchPredicate searchPredicate;
+ public BTreeRangeSearchCursor memCursor;
+ public LSMBTreeCursorInitialState searchInitialState;
+ public LSMBTreePointSearchCursor insertSearchCursor;
+
+ public LSMBTreeOpContext(List<ILSMComponent> mutableComponents, ITreeIndexFrameFactory insertLeafFrameFactory,
+ ITreeIndexFrameFactory deleteLeafFrameFactory, IModificationOperationCallback modificationCallback,
+ ISearchOperationCallback searchCallback, int numBloomFilterKeyFields, int[] btreeFields, int[] filterFields,
+ ILSMHarness lsmHarness) {
+ LSMBTreeMemoryComponent c = (LSMBTreeMemoryComponent) mutableComponents.get(0);
+ IBinaryComparatorFactory cmpFactories[] = c.getBTree().getComparatorFactories();
+ if (cmpFactories[0] != null) {
+ this.cmp = MultiComparator.create(c.getBTree().getComparatorFactories());
+ } else {
+ this.cmp = null;
+ }
+
+ bloomFilterCmp = MultiComparator.create(c.getBTree().getComparatorFactories(), 0, numBloomFilterKeyFields);
+
+ mutableBTrees = new BTree[mutableComponents.size()];
+ mutableBTreeAccessors = new BTree.BTreeAccessor[mutableComponents.size()];
+ mutableBTreeOpCtxs = new BTreeOpContext[mutableComponents.size()];
+ for (int i = 0; i < mutableComponents.size(); i++) {
+ LSMBTreeMemoryComponent mutableComponent = (LSMBTreeMemoryComponent) mutableComponents.get(i);
+ mutableBTrees[i] = mutableComponent.getBTree();
+ mutableBTreeAccessors[i] = (BTree.BTreeAccessor) mutableBTrees[i].createAccessor(modificationCallback,
+ NoOpOperationCallback.INSTANCE);
+ mutableBTreeOpCtxs[i] = mutableBTreeAccessors[i].getOpContext();
+ }
+
+ this.insertLeafFrameFactory = insertLeafFrameFactory;
+ this.deleteLeafFrameFactory = deleteLeafFrameFactory;
+ this.insertLeafFrame = (IBTreeLeafFrame) insertLeafFrameFactory.createFrame();
+ this.deleteLeafFrame = (IBTreeLeafFrame) deleteLeafFrameFactory.createFrame();
+ if (insertLeafFrame != null && this.cmp != null) {
+ insertLeafFrame.setMultiComparator(cmp);
+ }
+ if (deleteLeafFrame != null && this.cmp != null) {
+ deleteLeafFrame.setMultiComparator(cmp);
+ }
+ this.componentHolder = new LinkedList<ILSMComponent>();
+ this.componentsToBeMerged = new LinkedList<ILSMComponent>();
+ this.componentsToBeReplicated = new LinkedList<ILSMComponent>();
+ this.modificationCallback = modificationCallback;
+ this.searchCallback = searchCallback;
+
+ if (filterFields != null) {
+ indexTuple = new PermutingTupleReference(btreeFields);
+ filterCmp = MultiComparator.create(c.getLSMComponentFilter().getFilterCmpFactories());
+ filterTuple = new PermutingTupleReference(filterFields);
+ } else {
+ indexTuple = null;
+ filterCmp = null;
+ filterTuple = null;
+ }
+ searchPredicate = new RangePredicate(null, null, true, true, cmp, cmp);
+ if (insertLeafFrame != null) {
+ memCursor = new BTreeRangeSearchCursor(insertLeafFrame, false);
+ }
+
+ searchInitialState = new LSMBTreeCursorInitialState(insertLeafFrameFactory, cmp, bloomFilterCmp, lsmHarness,
+ null, searchCallback, null);
+ insertSearchCursor = new LSMBTreePointSearchCursor(this);
+ }
+
+ @Override
+ public void setOperation(IndexOperation newOp) {
+ reset();
+ this.op = newOp;
+ }
+
+ public void setInsertMode() {
+ currentMutableBTreeOpCtx.leafFrame = insertLeafFrame;
+ currentMutableBTreeOpCtx.leafFrameFactory = insertLeafFrameFactory;
+ }
+
+ public void setDeleteMode() {
+ currentMutableBTreeOpCtx.leafFrame = deleteLeafFrame;
+ currentMutableBTreeOpCtx.leafFrameFactory = deleteLeafFrameFactory;
+ }
+
+ @Override
+ public void reset() {
++ super.reset();
+ componentHolder.clear();
+ componentsToBeMerged.clear();
+ componentsToBeReplicated.clear();
+ }
+
+ @Override
+ public IndexOperation getOperation() {
+ return op;
+ }
+
+ @Override
+ public List<ILSMComponent> getComponentHolder() {
+ return componentHolder;
+ }
+
+ @Override
+ public ISearchOperationCallback getSearchOperationCallback() {
+ return searchCallback;
+ }
+
+ @Override
+ public IModificationOperationCallback getModificationCallback() {
+ return modificationCallback;
+ }
+
+ @Override
+ public void setCurrentMutableComponentId(int currentMutableComponentId) {
+ currentMutableBTreeAccessor = mutableBTreeAccessors[currentMutableComponentId];
+ currentMutableBTreeOpCtx = mutableBTreeOpCtxs[currentMutableComponentId];
+ switch (op) {
+ case SEARCH:
+ break;
+ case DISKORDERSCAN:
+ case UPDATE:
+ // Attention: It is important to leave the leafFrame and
+ // leafFrameFactory of the mutableBTree as is when doing an update.
+ // Update will only be set if a previous attempt to delete or
+ // insert failed, so we must preserve the semantics of the
+ // previously requested operation.
+ break;
+ case UPSERT:
+ case INSERT:
+ setInsertMode();
+ break;
+ case PHYSICALDELETE:
+ case DELETE:
+ setDeleteMode();
+ break;
+ }
+ }
+
+ @Override
+ public List<ILSMComponent> getComponentsToBeMerged() {
+ return componentsToBeMerged;
+ }
+
+ @Override
+ public void setSearchPredicate(ISearchPredicate searchPredicate) {
+ this.searchPredicate = searchPredicate;
+ }
+
+ @Override
+ public ISearchPredicate getSearchPredicate() {
+ return searchPredicate;
+ }
+
+ @Override
+ public List<ILSMComponent> getComponentsToBeReplicated() {
+ return componentsToBeReplicated;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e928b6ac/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
----------------------------------------------------------------------
diff --cc hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
index c1cef2d,0000000..11b933d
mode 100644,000000..100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
@@@ -1,54 -1,0 +1,60 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.storage.am.lsm.common.api;
+
+import java.util.List;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.common.api.IIndex;
+import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
+import org.apache.hyracks.storage.am.common.api.ISearchOperationCallback;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMHarness;
+
+/**
+ * Methods to be implemented by an LSM index, which are called from {@link LSMHarness}.
+ * The implementations of the methods below should be thread agnostic.
+ * Synchronization of LSM operations like updates/searches/flushes/merges are
+ * done by the {@link LSMHarness}. For example, a flush() implementation should only
+ * create and return the new on-disk component, ignoring the fact that
+ * concurrent searches/updates/merges may be ongoing.
+ */
+public interface ILSMIndex extends IIndex {
+
+ public void deactivate(boolean flushOnExit) throws HyracksDataException;
+
++ @Override
+ public ILSMIndexAccessor createAccessor(IModificationOperationCallback modificationCallback,
+ ISearchOperationCallback searchCallback) throws HyracksDataException;
+
+ public ILSMOperationTracker getOperationTracker();
+
+ public ILSMIOOperationScheduler getIOScheduler();
+
+ public ILSMIOOperationCallback getIOOperationCallback();
+
+ public List<ILSMComponent> getImmutableComponents();
+
+ public boolean isPrimaryIndex();
++
++ /**
++ * @return true if the index is durable. Otherwise false.
++ */
++ public boolean isDurable();
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e928b6ac/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java
----------------------------------------------------------------------
diff --cc hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java
index 99f981d,0000000..acf2233
mode 100644,000000..100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java
@@@ -1,44 -1,0 +1,51 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.api;
+
+import java.util.List;
+
+import org.apache.hyracks.storage.am.common.api.IIndexOperationContext;
+import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
+import org.apache.hyracks.storage.am.common.api.ISearchOperationCallback;
+import org.apache.hyracks.storage.am.common.api.ISearchPredicate;
+
+public interface ILSMIndexOperationContext extends IIndexOperationContext {
+ public List<ILSMComponent> getComponentHolder();
+
+ public List<ILSMComponent> getComponentsToBeMerged();
+
+ public ISearchOperationCallback getSearchOperationCallback();
+
+ public IModificationOperationCallback getModificationCallback();
+
+ public void setCurrentMutableComponentId(int currentMutableComponentId);
+
+ public void setSearchPredicate(ISearchPredicate searchPredicate);
+
+ public ISearchPredicate getSearchPredicate();
+
+ public List<ILSMComponent> getComponentsToBeReplicated();
++
++ /**
++ * @return true if this operation entered the components. Otherwise false.
++ */
++ public boolean isAccessingComponents();
++
++ public void setAccessingComponents(boolean accessingComponents);
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e928b6ac/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
----------------------------------------------------------------------
diff --cc hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
index 441dda1,0000000..440ad31
mode 100644,000000..100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
@@@ -1,296 -1,0 +1,299 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.storage.am.lsm.common.impls;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.replication.IReplicationJob.ReplicationExecutionType;
+import org.apache.hyracks.api.replication.IReplicationJob.ReplicationOperation;
+import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilter;
+import org.apache.hyracks.storage.am.common.api.ITreeIndex;
- import org.apache.hyracks.storage.am.common.api.ITreeIndexMetaDataFrame;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterFrameFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexFileManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexInternal;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
+import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
+import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
- import org.apache.hyracks.storage.common.buffercache.ICachedPage;
- import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+import org.apache.hyracks.storage.common.file.IFileMapProvider;
+
+public abstract class AbstractLSMIndex implements ILSMIndexInternal {
+ protected final ILSMHarness lsmHarness;
+
+ protected final ILSMIOOperationScheduler ioScheduler;
+ protected final ILSMIOOperationCallback ioOpCallback;
+
+ // In-memory components.
+ protected final List<ILSMComponent> memoryComponents;
+ protected final List<IVirtualBufferCache> virtualBufferCaches;
+ protected AtomicInteger currentMutableComponentId;
+
+ // On-disk components.
+ protected final IBufferCache diskBufferCache;
+ protected final ILSMIndexFileManager fileManager;
+ protected final IFileMapProvider diskFileMapProvider;
+ protected final List<ILSMComponent> diskComponents;
+ protected final List<ILSMComponent> inactiveDiskComponents;
+ protected final double bloomFilterFalsePositiveRate;
+ protected final ILSMComponentFilterFrameFactory filterFrameFactory;
+ protected final LSMComponentFilterManager filterManager;
+ protected final int[] filterFields;
+ protected final boolean durable;
+
+ protected boolean isActivated;
+ protected final AtomicBoolean[] flushRequests;
+ protected boolean memoryComponentsAllocated = false;
+
+ public AbstractLSMIndex(List<IVirtualBufferCache> virtualBufferCaches, IBufferCache diskBufferCache,
- ILSMIndexFileManager fileManager, IFileMapProvider diskFileMapProvider,
- double bloomFilterFalsePositiveRate, ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker,
- ILSMIOOperationScheduler ioScheduler, ILSMIOOperationCallback ioOpCallback,
- ILSMComponentFilterFrameFactory filterFrameFactory, LSMComponentFilterManager filterManager,
- int[] filterFields, boolean durable) {
++ ILSMIndexFileManager fileManager, IFileMapProvider diskFileMapProvider, double bloomFilterFalsePositiveRate,
++ ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, ILSMIOOperationScheduler ioScheduler,
++ ILSMIOOperationCallback ioOpCallback, ILSMComponentFilterFrameFactory filterFrameFactory,
++ LSMComponentFilterManager filterManager, int[] filterFields, boolean durable) {
+ this.virtualBufferCaches = virtualBufferCaches;
+ this.diskBufferCache = diskBufferCache;
+ this.diskFileMapProvider = diskFileMapProvider;
+ this.fileManager = fileManager;
+ this.bloomFilterFalsePositiveRate = bloomFilterFalsePositiveRate;
+ this.ioScheduler = ioScheduler;
+ this.ioOpCallback = ioOpCallback;
+ this.ioOpCallback.setNumOfMutableComponents(virtualBufferCaches.size());
+ this.filterFrameFactory = filterFrameFactory;
+ this.filterManager = filterManager;
+ this.filterFields = filterFields;
+ this.inactiveDiskComponents = new LinkedList<ILSMComponent>();
+ this.durable = durable;
+ lsmHarness = new LSMHarness(this, mergePolicy, opTracker, diskBufferCache.isReplicationEnabled());
+ isActivated = false;
+ diskComponents = new ArrayList<ILSMComponent>();
+ memoryComponents = new ArrayList<ILSMComponent>();
+ currentMutableComponentId = new AtomicInteger();
+ flushRequests = new AtomicBoolean[virtualBufferCaches.size()];
+ for (int i = 0; i < virtualBufferCaches.size(); i++) {
+ flushRequests[i] = new AtomicBoolean();
+ }
+ }
+
+ // The constructor used by external indexes
+ public AbstractLSMIndex(IBufferCache diskBufferCache, ILSMIndexFileManager fileManager,
+ IFileMapProvider diskFileMapProvider, double bloomFilterFalsePositiveRate, ILSMMergePolicy mergePolicy,
+ ILSMOperationTracker opTracker, ILSMIOOperationScheduler ioScheduler, ILSMIOOperationCallback ioOpCallback,
+ boolean durable) {
+ this.diskBufferCache = diskBufferCache;
+ this.diskFileMapProvider = diskFileMapProvider;
+ this.fileManager = fileManager;
+ this.bloomFilterFalsePositiveRate = bloomFilterFalsePositiveRate;
+ this.ioScheduler = ioScheduler;
+ this.ioOpCallback = ioOpCallback;
+ this.durable = durable;
+ lsmHarness = new ExternalIndexHarness(this, mergePolicy, opTracker, diskBufferCache.isReplicationEnabled());
+ isActivated = false;
+ diskComponents = new LinkedList<ILSMComponent>();
+ this.inactiveDiskComponents = new LinkedList<ILSMComponent>();
+ // Memory related objects are nulled
+ this.virtualBufferCaches = null;
+ memoryComponents = null;
+ currentMutableComponentId = null;
+ flushRequests = null;
+ filterFrameFactory = null;
+ filterManager = null;
+ filterFields = null;
+ }
+
+ protected void markAsValidInternal(ITreeIndex treeIndex) throws HyracksDataException {
+ int fileId = treeIndex.getFileId();
+ IBufferCache bufferCache = treeIndex.getBufferCache();
+ treeIndex.getMetaManager().close();
+ // WARNING: flushing the metadata page should be done after releasing the write latch; otherwise, the page
+ // won't be flushed to disk because it won't be dirty until the write latch has been released.
+ // Force modified metadata page to disk.
+ // If the index is not durable, then the flush is not necessary.
+ if (durable) {
+ bufferCache.force(fileId, true);
+ }
+ }
+
+ protected void markAsValidInternal(IBufferCache bufferCache, BloomFilter filter) throws HyracksDataException {
- if(durable){
- bufferCache.force(filter.getFileId(),true);
++ if (durable) {
++ bufferCache.force(filter.getFileId(), true);
+ }
+ }
+
+ @Override
+ public void addComponent(ILSMComponent c) throws HyracksDataException {
+ diskComponents.add(0, c);
+ }
+
+ @Override
+ public void subsumeMergedComponents(ILSMComponent newComponent, List<ILSMComponent> mergedComponents)
+ throws HyracksDataException {
+ int swapIndex = diskComponents.indexOf(mergedComponents.get(0));
+ diskComponents.removeAll(mergedComponents);
+ diskComponents.add(swapIndex, newComponent);
+ }
+
+ @Override
+ public void changeMutableComponent() {
+ currentMutableComponentId.set((currentMutableComponentId.get() + 1) % memoryComponents.size());
+ ((AbstractMemoryLSMComponent) memoryComponents.get(currentMutableComponentId.get())).setActive();
+ }
+
+ @Override
+ public List<ILSMComponent> getImmutableComponents() {
+ return diskComponents;
+ }
+
+ @Override
+ public void changeFlushStatusForCurrentMutableCompoent(boolean needsFlush) {
+ flushRequests[currentMutableComponentId.get()].set(needsFlush);
+ }
+
+ @Override
+ public boolean hasFlushRequestForCurrentMutableComponent() {
+ return flushRequests[currentMutableComponentId.get()].get();
+ }
+
+ @Override
+ public ILSMOperationTracker getOperationTracker() {
+ return lsmHarness.getOperationTracker();
+ }
+
+ @Override
+ public ILSMIOOperationScheduler getIOScheduler() {
+ return ioScheduler;
+ }
+
+ @Override
+ public ILSMIOOperationCallback getIOOperationCallback() {
+ return ioOpCallback;
+ }
+
+ @Override
+ public IBufferCache getBufferCache() {
+ return diskBufferCache;
+ }
+
- public boolean isEmptyIndex() throws HyracksDataException {
++ public boolean isEmptyIndex() {
+ boolean isModified = false;
+ for (ILSMComponent c : memoryComponents) {
+ AbstractMemoryLSMComponent mutableComponent = (AbstractMemoryLSMComponent) c;
+ if (mutableComponent.isModified()) {
+ isModified = true;
+ break;
+ }
+ }
+ return diskComponents.isEmpty() && !isModified;
+ }
+
+ @Override
+ public String toString() {
+ return "LSMIndex [" + fileManager.getBaseDir() + "]";
+ }
+
+ @Override
+ public boolean hasMemoryComponents() {
+ return true;
+ }
+
+ @Override
+ public boolean isCurrentMutableComponentEmpty() throws HyracksDataException {
+ //check if the current memory component has been modified
+ return !((AbstractMemoryLSMComponent) memoryComponents.get(currentMutableComponentId.get())).isModified();
+ }
+
+ public void setCurrentMutableComponentState(ComponentState componentState) {
+ ((AbstractMemoryLSMComponent) memoryComponents.get(currentMutableComponentId.get())).setState(componentState);
+ }
+
+ public ComponentState getCurrentMutableComponentState() {
+ return ((AbstractMemoryLSMComponent) memoryComponents.get(currentMutableComponentId.get())).getState();
+ }
+
+ public int getCurrentMutableComponentWriterCount() {
+ return ((AbstractMemoryLSMComponent) memoryComponents.get(currentMutableComponentId.get())).getWriterCount();
+ }
+
+ @Override
+ public List<ILSMComponent> getInactiveDiskComponents() {
+ return inactiveDiskComponents;
+ }
+
+ @Override
+ public void addInactiveDiskComponent(ILSMComponent diskComponent) {
+ inactiveDiskComponents.add(diskComponent);
+ }
+
+ public abstract Set<String> getLSMComponentPhysicalFiles(ILSMComponent newComponent);
+
+ @Override
+ public void scheduleReplication(ILSMIndexOperationContext ctx, List<ILSMComponent> lsmComponents, boolean bulkload,
+ ReplicationOperation operation, LSMOperationType opType) throws HyracksDataException {
+ //get set of files to be replicated for this component
+ Set<String> componentFiles = new HashSet<String>();
+
+ //get set of files to be replicated for each component
+ for (ILSMComponent lsmComponent : lsmComponents) {
+ componentFiles.addAll(getLSMComponentPhysicalFiles(lsmComponent));
+ }
+
+ ReplicationExecutionType executionType;
+ if (bulkload) {
+ executionType = ReplicationExecutionType.SYNC;
+ } else {
+ executionType = ReplicationExecutionType.ASYNC;
+ }
+
+ //create replication job and submit it
+ LSMIndexReplicationJob job = new LSMIndexReplicationJob(this, ctx, componentFiles, operation, executionType,
+ opType);
+ try {
+ diskBufferCache.getIOReplicationManager().submitJob(job);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
++ @Override
+ public abstract void allocateMemoryComponents() throws HyracksDataException;
+
++ @Override
+ public boolean isMemoryComponentsAllocated() {
+ return memoryComponentsAllocated;
+ }
++
++ @Override
++ public boolean isDurable() {
++ return durable;
++ }
+}