You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by im...@apache.org on 2016/04/07 17:00:24 UTC

[49/50] [abbrv] incubator-asterixdb git commit: Merge remote-tracking branch 'hyracks-local/master' into hyracks-merge2

Merge remote-tracking branch 'hyracks-local/master' into hyracks-merge2


Project: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/commit/e928b6ac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/tree/e928b6ac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/diff/e928b6ac

Branch: refs/heads/master
Commit: e928b6acd869c8bccd907c1979ef98ceb9a7c418
Parents: d630d1a 3f84996
Author: Ian Maxon <im...@apache.org>
Authored: Wed Apr 6 19:57:26 2016 -0700
Committer: Ian Maxon <im...@apache.org>
Committed: Wed Apr 6 19:57:26 2016 -0700

----------------------------------------------------------------------
 .../control/nc/NodeControllerService.java       |  9 ++++-
 .../std/file/FileRemoveOperatorDescriptor.java  | 26 ++++++++++---
 .../lsm/btree/impls/ExternalBTreeOpContext.java |  5 ++-
 .../impls/ExternalBTreeWithBuddyOpContext.java  |  5 ++-
 .../am/lsm/btree/impls/LSMBTreeOpContext.java   |  5 ++-
 .../storage/am/lsm/common/api/ILSMIndex.java    |  6 +++
 .../common/api/ILSMIndexOperationContext.java   |  7 ++++
 .../am/lsm/common/impls/AbstractLSMIndex.java   | 25 ++++++------
 .../lsm/common/impls/ExternalIndexHarness.java  | 27 +++++++++----
 .../storage/am/lsm/common/impls/LSMHarness.java | 20 +++++++++-
 .../lsm/common/impls/LSMIndexSearchCursor.java  |  4 +-
 .../impls/LSMInvertedIndexOpContext.java        |  5 ++-
 .../lsm/rtree/impls/ExternalRTreeOpContext.java |  5 ++-
 .../am/lsm/rtree/impls/LSMRTreeOpContext.java   |  5 ++-
 .../impls/AbstractLSMIndexOperationContext.java | 41 ++++++++++++++++++++
 15 files changed, 155 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e928b6ac/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
----------------------------------------------------------------------
diff --cc hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 7b5758c,0000000..598d6db
mode 100644,000000..100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@@ -1,594 -1,0 +1,599 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.hyracks.control.nc;
 +
 +import java.io.File;
 +import java.lang.management.GarbageCollectorMXBean;
 +import java.lang.management.ManagementFactory;
 +import java.lang.management.MemoryMXBean;
 +import java.lang.management.MemoryUsage;
 +import java.lang.management.OperatingSystemMXBean;
 +import java.lang.management.RuntimeMXBean;
 +import java.lang.management.ThreadMXBean;
 +import java.lang.reflect.Field;
 +import java.net.InetSocketAddress;
 +import java.util.ArrayList;
 +import java.util.Hashtable;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.StringTokenizer;
 +import java.util.Timer;
 +import java.util.TimerTask;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.Executors;
 +import java.util.concurrent.TimeUnit;
 +import java.util.logging.Level;
 +import java.util.logging.Logger;
 +
 +import org.apache.commons.lang3.mutable.Mutable;
 +import org.apache.commons.lang3.mutable.MutableObject;
 +import org.apache.hyracks.api.application.INCApplicationEntryPoint;
 +import org.apache.hyracks.api.client.NodeControllerInfo;
 +import org.apache.hyracks.api.comm.NetworkAddress;
 +import org.apache.hyracks.api.context.IHyracksRootContext;
 +import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
 +import org.apache.hyracks.api.deployment.DeploymentId;
 +import org.apache.hyracks.api.io.IODeviceHandle;
 +import org.apache.hyracks.api.job.JobId;
 +import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
 +import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
 +import org.apache.hyracks.api.service.IControllerService;
 +import org.apache.hyracks.control.common.base.IClusterController;
 +import org.apache.hyracks.control.common.context.ServerContext;
 +import org.apache.hyracks.control.common.controllers.NCConfig;
 +import org.apache.hyracks.control.common.controllers.NodeParameters;
 +import org.apache.hyracks.control.common.controllers.NodeRegistration;
 +import org.apache.hyracks.control.common.heartbeat.HeartbeatData;
 +import org.apache.hyracks.control.common.heartbeat.HeartbeatSchema;
 +import org.apache.hyracks.control.common.ipc.CCNCFunctions;
 +import org.apache.hyracks.control.common.ipc.CCNCFunctions.StateDumpRequestFunction;
 +import org.apache.hyracks.control.common.ipc.ClusterControllerRemoteProxy;
 +import org.apache.hyracks.control.common.job.profiling.om.JobProfile;
 +import org.apache.hyracks.control.common.work.FutureValue;
 +import org.apache.hyracks.control.common.work.WorkQueue;
 +import org.apache.hyracks.control.nc.application.NCApplicationContext;
 +import org.apache.hyracks.control.nc.dataset.DatasetPartitionManager;
 +import org.apache.hyracks.control.nc.io.IOManager;
 +import org.apache.hyracks.control.nc.io.profiling.IIOCounter;
 +import org.apache.hyracks.control.nc.io.profiling.IOCounterFactory;
 +import org.apache.hyracks.control.nc.net.DatasetNetworkManager;
 +import org.apache.hyracks.control.nc.net.NetworkManager;
 +import org.apache.hyracks.control.nc.partitions.PartitionManager;
 +import org.apache.hyracks.control.nc.resources.memory.MemoryManager;
 +import org.apache.hyracks.control.nc.runtime.RootHyracksContext;
 +import org.apache.hyracks.control.nc.work.AbortTasksWork;
 +import org.apache.hyracks.control.nc.work.ApplicationMessageWork;
 +import org.apache.hyracks.control.nc.work.BuildJobProfilesWork;
 +import org.apache.hyracks.control.nc.work.CleanupJobletWork;
 +import org.apache.hyracks.control.nc.work.DeployBinaryWork;
 +import org.apache.hyracks.control.nc.work.ReportPartitionAvailabilityWork;
 +import org.apache.hyracks.control.nc.work.ShutdownWork;
 +import org.apache.hyracks.control.nc.work.StartTasksWork;
 +import org.apache.hyracks.control.nc.work.StateDumpWork;
 +import org.apache.hyracks.control.nc.work.UnDeployBinaryWork;
 +import org.apache.hyracks.ipc.api.IIPCHandle;
 +import org.apache.hyracks.ipc.api.IIPCI;
 +import org.apache.hyracks.ipc.api.IPCPerformanceCounters;
 +import org.apache.hyracks.ipc.impl.IPCSystem;
 +import org.apache.hyracks.net.protocols.muxdemux.MuxDemuxPerformanceCounters;
 +
 +public class NodeControllerService implements IControllerService {
 +    private static Logger LOGGER = Logger.getLogger(NodeControllerService.class.getName());
 +
 +    private static final double MEMORY_FUDGE_FACTOR = 0.8;
 +
 +    private NCConfig ncConfig;
 +
 +    private final String id;
 +
 +    private final IHyracksRootContext ctx;
 +
 +    private final IPCSystem ipc;
 +
 +    private final PartitionManager partitionManager;
 +
 +    private final NetworkManager netManager;
 +
 +    private IDatasetPartitionManager datasetPartitionManager;
 +
 +    private DatasetNetworkManager datasetNetworkManager;
 +
 +    private final WorkQueue queue;
 +
 +    private final Timer timer;
 +
 +    private boolean registrationPending;
 +
 +    private Exception registrationException;
 +
 +    private IClusterController ccs;
 +
 +    private final Map<JobId, Joblet> jobletMap;
 +
 +    private ExecutorService executor;
 +
 +    private NodeParameters nodeParameters;
 +
 +    private HeartbeatTask heartbeatTask;
 +
 +    private final ServerContext serverCtx;
 +
 +    private NCApplicationContext appCtx;
 +
 +    private INCApplicationEntryPoint ncAppEntryPoint;
 +
 +    private final ILifeCycleComponentManager lccm;
 +
 +    private final MemoryMXBean memoryMXBean;
 +
 +    private final List<GarbageCollectorMXBean> gcMXBeans;
 +
 +    private final ThreadMXBean threadMXBean;
 +
 +    private final RuntimeMXBean runtimeMXBean;
 +
 +    private final OperatingSystemMXBean osMXBean;
 +
 +    private final Mutable<FutureValue<Map<String, NodeControllerInfo>>> getNodeControllerInfosAcceptor;
 +
 +    private final MemoryManager memoryManager;
 +
 +    private boolean shuttedDown = false;
 +
 +    private IIOCounter ioCounter;
 +
 +    public NodeControllerService(NCConfig ncConfig) throws Exception {
 +        this.ncConfig = ncConfig;
 +        id = ncConfig.nodeId;
 +        NodeControllerIPCI ipci = new NodeControllerIPCI();
 +        ipc = new IPCSystem(new InetSocketAddress(ncConfig.clusterNetIPAddress, ncConfig.clusterNetPort), ipci,
 +                new CCNCFunctions.SerializerDeserializer());
 +
 +        this.ctx = new RootHyracksContext(this, new IOManager(getDevices(ncConfig.ioDevices)));
 +        if (id == null) {
 +            throw new Exception("id not set");
 +        }
 +        partitionManager = new PartitionManager(this);
 +        netManager = new NetworkManager(ncConfig.dataIPAddress, ncConfig.dataPort, partitionManager,
 +                ncConfig.nNetThreads, ncConfig.nNetBuffers, ncConfig.dataPublicIPAddress, ncConfig.dataPublicPort);
 +
 +        lccm = new LifeCycleComponentManager();
 +        queue = new WorkQueue(Thread.NORM_PRIORITY); // Reserves MAX_PRIORITY of the heartbeat thread.
 +        jobletMap = new Hashtable<JobId, Joblet>();
 +        timer = new Timer(true);
 +        serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER,
 +                new File(new File(NodeControllerService.class.getName()), id));
 +        memoryMXBean = ManagementFactory.getMemoryMXBean();
 +        gcMXBeans = ManagementFactory.getGarbageCollectorMXBeans();
 +        threadMXBean = ManagementFactory.getThreadMXBean();
 +        runtimeMXBean = ManagementFactory.getRuntimeMXBean();
 +        osMXBean = ManagementFactory.getOperatingSystemMXBean();
 +        registrationPending = true;
 +        getNodeControllerInfosAcceptor = new MutableObject<FutureValue<Map<String, NodeControllerInfo>>>();
 +        memoryManager = new MemoryManager((long) (memoryMXBean.getHeapMemoryUsage().getMax() * MEMORY_FUDGE_FACTOR));
 +        ioCounter = new IOCounterFactory().getIOCounter();
 +    }
 +
 +    public IHyracksRootContext getRootContext() {
 +        return ctx;
 +    }
 +
 +    public NCApplicationContext getApplicationContext() {
 +        return appCtx;
 +    }
 +
 +    public ILifeCycleComponentManager getLifeCycleComponentManager() {
 +        return lccm;
 +    }
 +
 +    private static List<IODeviceHandle> getDevices(String ioDevices) {
 +        List<IODeviceHandle> devices = new ArrayList<IODeviceHandle>();
 +        StringTokenizer tok = new StringTokenizer(ioDevices, ",");
 +        while (tok.hasMoreElements()) {
 +            String devPath = tok.nextToken().trim();
 +            devices.add(new IODeviceHandle(new File(devPath), "."));
 +        }
 +        return devices;
 +    }
 +
 +    private synchronized void setNodeRegistrationResult(NodeParameters parameters, Exception exception) {
 +        this.nodeParameters = parameters;
 +        this.registrationException = exception;
 +        this.registrationPending = false;
 +        notifyAll();
 +    }
 +
 +    public Map<String, NodeControllerInfo> getNodeControllersInfo() throws Exception {
 +        FutureValue<Map<String, NodeControllerInfo>> fv = new FutureValue<Map<String, NodeControllerInfo>>();
 +        synchronized (getNodeControllerInfosAcceptor) {
 +            while (getNodeControllerInfosAcceptor.getValue() != null) {
 +                getNodeControllerInfosAcceptor.wait();
 +            }
 +            getNodeControllerInfosAcceptor.setValue(fv);
 +        }
 +        ccs.getNodeControllerInfos();
 +        return fv.get();
 +    }
 +
 +    private void setNodeControllersInfo(Map<String, NodeControllerInfo> ncInfos) {
 +        FutureValue<Map<String, NodeControllerInfo>> fv;
 +        synchronized (getNodeControllerInfosAcceptor) {
 +            fv = getNodeControllerInfosAcceptor.getValue();
 +            getNodeControllerInfosAcceptor.setValue(null);
 +            getNodeControllerInfosAcceptor.notifyAll();
 +        }
 +        fv.setValue(ncInfos);
 +    }
 +
 +    private void init() throws Exception {
 +        ctx.getIOManager().setExecutor(executor);
 +        datasetPartitionManager = new DatasetPartitionManager(this, executor, ncConfig.resultManagerMemory,
 +                ncConfig.resultTTL, ncConfig.resultSweepThreshold);
 +        datasetNetworkManager = new DatasetNetworkManager(ncConfig.resultIPAddress, ncConfig.resultPort,
 +                datasetPartitionManager, ncConfig.nNetThreads, ncConfig.nNetBuffers, ncConfig.resultPublicIPAddress,
 +                ncConfig.resultPublicPort);
 +    }
 +
 +    @Override
 +    public void start() throws Exception {
 +        LOGGER.log(Level.INFO, "Starting NodeControllerService");
 +        ipc.start();
 +        netManager.start();
 +
 +        startApplication();
 +        init();
 +
 +        datasetNetworkManager.start();
 +        IIPCHandle ccIPCHandle = ipc.getHandle(new InetSocketAddress(ncConfig.ccHost, ncConfig.ccPort), -1);
 +        this.ccs = new ClusterControllerRemoteProxy(ccIPCHandle);
 +        HeartbeatSchema.GarbageCollectorInfo[] gcInfos = new HeartbeatSchema.GarbageCollectorInfo[gcMXBeans.size()];
 +        for (int i = 0; i < gcInfos.length; ++i) {
 +            gcInfos[i] = new HeartbeatSchema.GarbageCollectorInfo(gcMXBeans.get(i).getName());
 +        }
 +        HeartbeatSchema hbSchema = new HeartbeatSchema(gcInfos);
 +        // Use "public" versions of network addresses and ports
 +        NetworkAddress datasetAddress = datasetNetworkManager.getPublicNetworkAddress();
 +        NetworkAddress netAddress = netManager.getPublicNetworkAddress();
 +        if (ncConfig.dataPublicIPAddress != null) {
 +            netAddress = new NetworkAddress(ncConfig.dataPublicIPAddress, ncConfig.dataPublicPort);
 +        }
 +        ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id, ncConfig, netAddress, datasetAddress,
 +                osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), osMXBean.getAvailableProcessors(),
 +                runtimeMXBean.getVmName(), runtimeMXBean.getVmVersion(), runtimeMXBean.getVmVendor(),
 +                runtimeMXBean.getClassPath(), runtimeMXBean.getLibraryPath(), runtimeMXBean.getBootClassPath(),
 +                runtimeMXBean.getInputArguments(), runtimeMXBean.getSystemProperties(), hbSchema));
 +
 +        synchronized (this) {
 +            while (registrationPending) {
 +                wait();
 +            }
 +        }
 +        if (registrationException != null) {
 +            throw registrationException;
 +        }
 +        appCtx.setDistributedState(nodeParameters.getDistributedState());
 +
 +        queue.start();
 +
 +        heartbeatTask = new HeartbeatTask(ccs);
 +
 +        // Use reflection to set the priority of the timer thread.
 +        Field threadField = timer.getClass().getDeclaredField("thread");
 +        threadField.setAccessible(true);
 +        Thread timerThread = (Thread) threadField.get(timer); // The internal timer thread of the Timer object.
 +        timerThread.setPriority(Thread.MAX_PRIORITY);
 +        // Schedule heartbeat generator.
 +        timer.schedule(heartbeatTask, 0, nodeParameters.getHeartbeatPeriod());
 +
 +        if (nodeParameters.getProfileDumpPeriod() > 0) {
 +            // Schedule profile dump generator.
 +            timer.schedule(new ProfileDumpTask(ccs), 0, nodeParameters.getProfileDumpPeriod());
 +        }
 +
 +        LOGGER.log(Level.INFO, "Started NodeControllerService");
 +        if (ncAppEntryPoint != null) {
 +            ncAppEntryPoint.notifyStartupComplete();
 +        }
 +
 +        //add JVM shutdown hook
 +        Runtime.getRuntime().addShutdownHook(new JVMShutdownHook(this));
 +    }
 +
 +    private void startApplication() throws Exception {
 +        appCtx = new NCApplicationContext(this, serverCtx, ctx, id, memoryManager, lccm);
 +        String className = ncConfig.appNCMainClass;
 +        if (className != null) {
 +            Class<?> c = Class.forName(className);
 +            ncAppEntryPoint = (INCApplicationEntryPoint) c.newInstance();
 +            String[] args = ncConfig.appArgs == null ? new String[0]
 +                    : ncConfig.appArgs.toArray(new String[ncConfig.appArgs.size()]);
 +            ncAppEntryPoint.start(appCtx, args);
 +        }
 +        executor = Executors.newCachedThreadPool(appCtx.getThreadFactory());
 +    }
 +
 +    @Override
 +    public synchronized void stop() throws Exception {
 +        if (!shuttedDown) {
 +            LOGGER.log(Level.INFO, "Stopping NodeControllerService");
 +            executor.shutdownNow();
 +            if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
 +                LOGGER.log(Level.SEVERE, "Some jobs failed to exit, continuing shutdown abnormally");
 +            }
 +            partitionManager.close();
 +            datasetPartitionManager.close();
-             heartbeatTask.cancel();
 +            netManager.stop();
 +            datasetNetworkManager.stop();
 +            queue.stop();
-             if (ncAppEntryPoint != null)
++            if (ncAppEntryPoint != null) {
 +                ncAppEntryPoint.stop();
++            }
++            /**
++             * Stop heartbeat after NC has stopped to avoid false node failure detection
++             * on CC if an NC takes a long time to stop.
++             */
++            heartbeatTask.cancel();
 +            LOGGER.log(Level.INFO, "Stopped NodeControllerService");
 +            shuttedDown = true;
 +        }
 +    }
 +
 +    public String getId() {
 +        return id;
 +    }
 +
 +    public ServerContext getServerContext() {
 +        return serverCtx;
 +    }
 +
 +    public Map<JobId, Joblet> getJobletMap() {
 +        return jobletMap;
 +    }
 +
 +    public NetworkManager getNetworkManager() {
 +        return netManager;
 +    }
 +
 +    public DatasetNetworkManager getDatasetNetworkManager() {
 +        return datasetNetworkManager;
 +    }
 +
 +    public PartitionManager getPartitionManager() {
 +        return partitionManager;
 +    }
 +
 +    public IClusterController getClusterController() {
 +        return ccs;
 +    }
 +
 +    public NodeParameters getNodeParameters() {
 +        return nodeParameters;
 +    }
 +
 +    public ExecutorService getExecutorService() {
 +        return executor;
 +    }
 +
 +    public NCConfig getConfiguration() {
 +        return ncConfig;
 +    }
 +
 +    public WorkQueue getWorkQueue() {
 +        return queue;
 +    }
 +
 +    private class HeartbeatTask extends TimerTask {
 +        private IClusterController cc;
 +
 +        private final HeartbeatData hbData;
 +
 +        public HeartbeatTask(IClusterController cc) {
 +            this.cc = cc;
 +            hbData = new HeartbeatData();
 +            hbData.gcCollectionCounts = new long[gcMXBeans.size()];
 +            hbData.gcCollectionTimes = new long[gcMXBeans.size()];
 +        }
 +
 +        @Override
 +        public void run() {
 +            MemoryUsage heapUsage = memoryMXBean.getHeapMemoryUsage();
 +            hbData.heapInitSize = heapUsage.getInit();
 +            hbData.heapUsedSize = heapUsage.getUsed();
 +            hbData.heapCommittedSize = heapUsage.getCommitted();
 +            hbData.heapMaxSize = heapUsage.getMax();
 +            MemoryUsage nonheapUsage = memoryMXBean.getNonHeapMemoryUsage();
 +            hbData.nonheapInitSize = nonheapUsage.getInit();
 +            hbData.nonheapUsedSize = nonheapUsage.getUsed();
 +            hbData.nonheapCommittedSize = nonheapUsage.getCommitted();
 +            hbData.nonheapMaxSize = nonheapUsage.getMax();
 +            hbData.threadCount = threadMXBean.getThreadCount();
 +            hbData.peakThreadCount = threadMXBean.getPeakThreadCount();
 +            hbData.totalStartedThreadCount = threadMXBean.getTotalStartedThreadCount();
 +            hbData.systemLoadAverage = osMXBean.getSystemLoadAverage();
 +            int gcN = gcMXBeans.size();
 +            for (int i = 0; i < gcN; ++i) {
 +                GarbageCollectorMXBean gcMXBean = gcMXBeans.get(i);
 +                hbData.gcCollectionCounts[i] = gcMXBean.getCollectionCount();
 +                hbData.gcCollectionTimes[i] = gcMXBean.getCollectionTime();
 +            }
 +
 +            MuxDemuxPerformanceCounters netPC = netManager.getPerformanceCounters();
 +            hbData.netPayloadBytesRead = netPC.getPayloadBytesRead();
 +            hbData.netPayloadBytesWritten = netPC.getPayloadBytesWritten();
 +            hbData.netSignalingBytesRead = netPC.getSignalingBytesRead();
 +            hbData.netSignalingBytesWritten = netPC.getSignalingBytesWritten();
 +
 +            MuxDemuxPerformanceCounters datasetNetPC = datasetNetworkManager.getPerformanceCounters();
 +            hbData.datasetNetPayloadBytesRead = datasetNetPC.getPayloadBytesRead();
 +            hbData.datasetNetPayloadBytesWritten = datasetNetPC.getPayloadBytesWritten();
 +            hbData.datasetNetSignalingBytesRead = datasetNetPC.getSignalingBytesRead();
 +            hbData.datasetNetSignalingBytesWritten = datasetNetPC.getSignalingBytesWritten();
 +
 +            IPCPerformanceCounters ipcPC = ipc.getPerformanceCounters();
 +            hbData.ipcMessagesSent = ipcPC.getMessageSentCount();
 +            hbData.ipcMessageBytesSent = ipcPC.getMessageBytesSent();
 +            hbData.ipcMessagesReceived = ipcPC.getMessageReceivedCount();
 +            hbData.ipcMessageBytesReceived = ipcPC.getMessageBytesReceived();
 +
 +            hbData.diskReads = ioCounter.getReads();
 +            hbData.diskWrites = ioCounter.getWrites();
 +
 +            try {
 +                cc.nodeHeartbeat(id, hbData);
 +            } catch (Exception e) {
 +                e.printStackTrace();
 +            }
 +        }
 +    }
 +
 +    private class ProfileDumpTask extends TimerTask {
 +        private IClusterController cc;
 +
 +        public ProfileDumpTask(IClusterController cc) {
 +            this.cc = cc;
 +        }
 +
 +        @Override
 +        public void run() {
 +            try {
 +                FutureValue<List<JobProfile>> fv = new FutureValue<List<JobProfile>>();
 +                BuildJobProfilesWork bjpw = new BuildJobProfilesWork(NodeControllerService.this, fv);
 +                queue.scheduleAndSync(bjpw);
 +                List<JobProfile> profiles = fv.get();
 +                if (!profiles.isEmpty()) {
 +                    cc.reportProfile(id, profiles);
 +                }
 +            } catch (Exception e) {
 +                e.printStackTrace();
 +            }
 +        }
 +    }
 +
 +    private final class NodeControllerIPCI implements IIPCI {
 +        @Override
 +        public void deliverIncomingMessage(final IIPCHandle handle, long mid, long rmid, Object payload,
 +                Exception exception) {
 +            CCNCFunctions.Function fn = (CCNCFunctions.Function) payload;
 +            switch (fn.getFunctionId()) {
 +                case SEND_APPLICATION_MESSAGE: {
 +                    CCNCFunctions.SendApplicationMessageFunction amf = (CCNCFunctions.SendApplicationMessageFunction) fn;
 +                    queue.schedule(new ApplicationMessageWork(NodeControllerService.this, amf.getMessage(),
 +                            amf.getDeploymentId(), amf.getNodeId()));
 +                    return;
 +                }
 +                case START_TASKS: {
 +                    CCNCFunctions.StartTasksFunction stf = (CCNCFunctions.StartTasksFunction) fn;
 +                    queue.schedule(new StartTasksWork(NodeControllerService.this, stf.getDeploymentId(), stf.getJobId(),
 +                            stf.getPlanBytes(), stf.getTaskDescriptors(), stf.getConnectorPolicies(), stf.getFlags()));
 +                    return;
 +                }
 +
 +                case ABORT_TASKS: {
 +                    CCNCFunctions.AbortTasksFunction atf = (CCNCFunctions.AbortTasksFunction) fn;
 +                    queue.schedule(new AbortTasksWork(NodeControllerService.this, atf.getJobId(), atf.getTasks()));
 +                    return;
 +                }
 +
 +                case CLEANUP_JOBLET: {
 +                    CCNCFunctions.CleanupJobletFunction cjf = (CCNCFunctions.CleanupJobletFunction) fn;
 +                    queue.schedule(new CleanupJobletWork(NodeControllerService.this, cjf.getJobId(), cjf.getStatus()));
 +                    return;
 +                }
 +
 +                case REPORT_PARTITION_AVAILABILITY: {
 +                    CCNCFunctions.ReportPartitionAvailabilityFunction rpaf = (CCNCFunctions.ReportPartitionAvailabilityFunction) fn;
 +                    queue.schedule(new ReportPartitionAvailabilityWork(NodeControllerService.this,
 +                            rpaf.getPartitionId(), rpaf.getNetworkAddress()));
 +                    return;
 +                }
 +
 +                case NODE_REGISTRATION_RESULT: {
 +                    CCNCFunctions.NodeRegistrationResult nrrf = (CCNCFunctions.NodeRegistrationResult) fn;
 +                    setNodeRegistrationResult(nrrf.getNodeParameters(), nrrf.getException());
 +                    return;
 +                }
 +
 +                case GET_NODE_CONTROLLERS_INFO_RESPONSE: {
 +                    CCNCFunctions.GetNodeControllersInfoResponseFunction gncirf = (CCNCFunctions.GetNodeControllersInfoResponseFunction) fn;
 +                    setNodeControllersInfo(gncirf.getNodeControllerInfos());
 +                    return;
 +                }
 +
 +                case DEPLOY_BINARY: {
 +                    CCNCFunctions.DeployBinaryFunction ndbf = (CCNCFunctions.DeployBinaryFunction) fn;
 +                    queue.schedule(new DeployBinaryWork(NodeControllerService.this, ndbf.getDeploymentId(),
 +                            ndbf.getBinaryURLs()));
 +                    return;
 +                }
 +
 +                case UNDEPLOY_BINARY: {
 +                    CCNCFunctions.UnDeployBinaryFunction ndbf = (CCNCFunctions.UnDeployBinaryFunction) fn;
 +                    queue.schedule(new UnDeployBinaryWork(NodeControllerService.this, ndbf.getDeploymentId()));
 +                    return;
 +                }
 +
 +                case STATE_DUMP_REQUEST: {
 +                    final CCNCFunctions.StateDumpRequestFunction dsrf = (StateDumpRequestFunction) fn;
 +                    queue.schedule(new StateDumpWork(NodeControllerService.this, dsrf.getStateDumpId()));
 +                    return;
 +                }
 +                case SHUTDOWN_REQUEST: {
 +                    queue.schedule(new ShutdownWork(NodeControllerService.this));
 +                    return;
 +                }
 +            }
 +            throw new IllegalArgumentException("Unknown function: " + fn.getFunctionId());
 +
 +        }
 +    }
 +
 +    public void sendApplicationMessageToCC(byte[] data, DeploymentId deploymentId) throws Exception {
 +        ccs.sendApplicationMessageToCC(data, deploymentId, id);
 +    }
 +
 +    public IDatasetPartitionManager getDatasetPartitionManager() {
 +        return datasetPartitionManager;
 +    }
 +
 +    /**
 +     * Shutdown hook that invokes {@link NCApplicationEntryPoint#stop() stop} method.
 +     */
 +    private static class JVMShutdownHook extends Thread {
 +
 +        private final NodeControllerService nodeControllerService;
 +
 +        public JVMShutdownHook(NodeControllerService ncAppEntryPoint) {
 +            this.nodeControllerService = ncAppEntryPoint;
 +        }
 +
 +        @Override
 +        public void run() {
 +            if (LOGGER.isLoggable(Level.INFO)) {
 +                LOGGER.info("Shutdown hook in progress");
 +            }
 +            try {
 +                nodeControllerService.stop();
 +            } catch (Exception e) {
 +                if (LOGGER.isLoggable(Level.WARNING)) {
 +                    LOGGER.warning("Exception in executing shutdown hook" + e);
 +                }
 +            }
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e928b6ac/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FileRemoveOperatorDescriptor.java
----------------------------------------------------------------------
diff --cc hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FileRemoveOperatorDescriptor.java
index cf2008c,0000000..c3883e8
mode 100644,000000..100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FileRemoveOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FileRemoveOperatorDescriptor.java
@@@ -1,88 -1,0 +1,104 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +
 +package org.apache.hyracks.dataflow.std.file;
 +
 +import java.io.File;
 +import java.io.IOException;
 +
 +import org.apache.commons.io.FileUtils;
 +import org.apache.hyracks.api.comm.IFrameWriter;
 +import org.apache.hyracks.api.context.IHyracksTaskContext;
 +import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 +import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 +import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 +import org.apache.hyracks.api.exceptions.HyracksDataException;
 +import org.apache.hyracks.api.io.IIOManager;
 +import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
 +import org.apache.hyracks.dataflow.std.base.AbstractOperatorNodePushable;
 +import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
 +
 +public class FileRemoveOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
 +
 +    private final IFileSplitProvider fileSplitProvider;
++    private final boolean quietly;
 +
-     public FileRemoveOperatorDescriptor(IOperatorDescriptorRegistry spec, IFileSplitProvider fileSplitProvder) {
++    public FileRemoveOperatorDescriptor(IOperatorDescriptorRegistry spec, IFileSplitProvider fileSplitProvder,
++            boolean quietly) {
 +        super(spec, 0, 0);
 +        this.fileSplitProvider = fileSplitProvder;
++        this.quietly = quietly;
++    }
++
++    /**
++     *
++     * @deprecated use {@link #FileRemoveOperatorDescriptor(IOperatorDescriptorRegistry spec, IFileSplitProvider fileSplitProvder, boolean quietly)} instead.
++     */
++    @Deprecated
++    public FileRemoveOperatorDescriptor(IOperatorDescriptorRegistry spec, IFileSplitProvider fileSplitProvder) {
++        this(spec, fileSplitProvder, false);
 +    }
 +
 +    private static final long serialVersionUID = 1L;
 +
 +    @Override
 +    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
 +            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
 +        final FileSplit split = fileSplitProvider.getFileSplits()[partition];
 +        final String path = split.getLocalFile().getFile().getPath();
 +        final int deviceId = split.getIODeviceId();
 +        final IIOManager ioManager = ctx.getIOManager();
 +        return new AbstractOperatorNodePushable() {
 +
 +            @Override
 +            public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
 +                throw new IllegalStateException();
 +            }
 +
 +            @Override
 +            public void initialize() throws HyracksDataException {
 +                File f = ioManager.getAbsoluteFileRef(deviceId, path).getFile();
-                 try {
-                     FileUtils.deleteDirectory(f);
-                 } catch (IOException e) {
-                     throw new HyracksDataException(e);
++                if (quietly) {
++                    FileUtils.deleteQuietly(f);
++                } else {
++                    try {
++                        FileUtils.deleteDirectory(f);
++                    } catch (IOException e) {
++                        throw new HyracksDataException(e);
++                    }
 +                }
 +            }
 +
 +            @Override
 +            public IFrameWriter getInputFrameWriter(int index) {
 +                throw new IllegalStateException();
 +            }
 +
 +            @Override
 +            public int getInputArity() {
 +                return 0;
 +            }
 +
 +            @Override
 +            public void deinitialize() throws HyracksDataException {
 +            }
 +        };
 +    }
 +
 +}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e928b6ac/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeOpContext.java
----------------------------------------------------------------------
diff --cc hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeOpContext.java
index d63671e,0000000..29fedef
mode 100644,000000..100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeOpContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeOpContext.java
@@@ -1,145 -1,0 +1,146 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.hyracks.storage.am.lsm.btree.impls;
 +
 +import java.util.LinkedList;
 +import java.util.List;
 +
 +import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 +import org.apache.hyracks.storage.am.btree.api.IBTreeLeafFrame;
 +import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
 +import org.apache.hyracks.storage.am.common.api.ISearchOperationCallback;
 +import org.apache.hyracks.storage.am.common.api.ISearchPredicate;
 +import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
 +import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 +import org.apache.hyracks.storage.am.common.ophelpers.MultiComparator;
 +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
 +import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
- import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
++import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexOperationContext;
 +
- public class ExternalBTreeOpContext implements ILSMIndexOperationContext {
++public class ExternalBTreeOpContext extends AbstractLSMIndexOperationContext {
 +    public ITreeIndexFrameFactory insertLeafFrameFactory;
 +    public ITreeIndexFrameFactory deleteLeafFrameFactory;
 +    public IBTreeLeafFrame insertLeafFrame;
 +    public IBTreeLeafFrame deleteLeafFrame;
 +    public IndexOperation op;
 +    public final MultiComparator cmp;
 +    public final MultiComparator bloomFilterCmp;
 +    public final ISearchOperationCallback searchCallback;
 +    private final List<ILSMComponent> componentHolder;
 +    private final List<ILSMComponent> componentsToBeMerged;
 +    private final List<ILSMComponent> componentsToBeReplicated;
 +    private final int targetIndexVersion;
 +    public ISearchPredicate searchPredicate;
 +    public LSMBTreeCursorInitialState searchInitialState;
 +
 +    public ExternalBTreeOpContext(ITreeIndexFrameFactory insertLeafFrameFactory,
 +            ITreeIndexFrameFactory deleteLeafFrameFactory, ISearchOperationCallback searchCallback,
 +            int numBloomFilterKeyFields, IBinaryComparatorFactory[] cmpFactories, int targetIndexVersion,
 +            ILSMHarness lsmHarness) {
 +        if (cmpFactories != null) {
 +            this.cmp = MultiComparator.create(cmpFactories);
 +        } else {
 +            this.cmp = null;
 +        }
 +        bloomFilterCmp = MultiComparator.create(cmpFactories, 0, numBloomFilterKeyFields);
 +        this.insertLeafFrameFactory = insertLeafFrameFactory;
 +        this.deleteLeafFrameFactory = deleteLeafFrameFactory;
 +        this.insertLeafFrame = (IBTreeLeafFrame) insertLeafFrameFactory.createFrame();
 +        this.deleteLeafFrame = (IBTreeLeafFrame) deleteLeafFrameFactory.createFrame();
 +        if (insertLeafFrame != null && this.cmp != null) {
 +            insertLeafFrame.setMultiComparator(cmp);
 +        }
 +        if (deleteLeafFrame != null && this.cmp != null) {
 +            deleteLeafFrame.setMultiComparator(cmp);
 +        }
 +        this.componentHolder = new LinkedList<ILSMComponent>();
 +        this.componentsToBeMerged = new LinkedList<ILSMComponent>();
 +        this.componentsToBeReplicated = new LinkedList<ILSMComponent>();
 +        this.searchCallback = searchCallback;
 +        this.targetIndexVersion = targetIndexVersion;
 +        searchInitialState = new LSMBTreeCursorInitialState(insertLeafFrameFactory, cmp, bloomFilterCmp, lsmHarness,
 +                null, searchCallback, null);
 +    }
 +
 +    @Override
 +    public void setOperation(IndexOperation newOp) {
 +        reset();
 +        this.op = newOp;
 +    }
 +
 +    @Override
 +    public void reset() {
++        super.reset();
 +        componentHolder.clear();
 +        componentsToBeMerged.clear();
 +        componentsToBeReplicated.clear();
 +    }
 +
 +    @Override
 +    public IndexOperation getOperation() {
 +        return op;
 +    }
 +
 +    @Override
 +    public List<ILSMComponent> getComponentHolder() {
 +        return componentHolder;
 +    }
 +
 +    @Override
 +    public ISearchOperationCallback getSearchOperationCallback() {
 +        return searchCallback;
 +    }
 +
 +    // Disk only index should never needs a modification callback
 +    @Override
 +    public IModificationOperationCallback getModificationCallback() {
 +        return null;
 +    }
 +
 +    @Override
 +    public void setCurrentMutableComponentId(int currentMutableComponentId) {
 +        // Do nothing: this method should never be called for this class
 +    }
 +
 +    @Override
 +    public List<ILSMComponent> getComponentsToBeMerged() {
 +        return componentsToBeMerged;
 +    }
 +
 +    // Used by indexes with global transaction
 +    public int getTargetIndexVersion() {
 +        return targetIndexVersion;
 +    }
 +
 +    @Override
 +    public void setSearchPredicate(ISearchPredicate searchPredicate) {
 +        this.searchPredicate = searchPredicate;
 +    }
 +
 +    @Override
 +    public ISearchPredicate getSearchPredicate() {
 +        return searchPredicate;
 +    }
 +
 +    @Override
 +    public List<ILSMComponent> getComponentsToBeReplicated() {
 +        return componentsToBeReplicated;
 +    }
 +
 +}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e928b6ac/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddyOpContext.java
----------------------------------------------------------------------
diff --cc hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddyOpContext.java
index a837301,0000000..c44f529
mode 100644,000000..100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddyOpContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddyOpContext.java
@@@ -1,134 -1,0 +1,135 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.hyracks.storage.am.lsm.btree.impls;
 +
 +import java.util.LinkedList;
 +import java.util.List;
 +
 +import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 +import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
 +import org.apache.hyracks.storage.am.common.api.ISearchOperationCallback;
 +import org.apache.hyracks.storage.am.common.api.ISearchPredicate;
 +import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
 +import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
 +import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 +import org.apache.hyracks.storage.am.common.ophelpers.MultiComparator;
 +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
 +import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
- import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
++import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexOperationContext;
 +
- public class ExternalBTreeWithBuddyOpContext implements ILSMIndexOperationContext {
++public class ExternalBTreeWithBuddyOpContext extends AbstractLSMIndexOperationContext {
 +    private IndexOperation op;
 +    private MultiComparator bTreeCmp;
 +    private MultiComparator buddyBTreeCmp;
 +    public final List<ILSMComponent> componentHolder;
 +    private final List<ILSMComponent> componentsToBeMerged;
 +    private final List<ILSMComponent> componentsToBeReplicated;
 +    public final ISearchOperationCallback searchCallback;
 +    private final int targetIndexVersion;
 +    public ISearchPredicate searchPredicate;
 +    public LSMBTreeWithBuddyCursorInitialState searchInitialState;
 +
 +    public ExternalBTreeWithBuddyOpContext(IBinaryComparatorFactory[] btreeCmpFactories,
 +            IBinaryComparatorFactory[] buddyBtreeCmpFactories, ISearchOperationCallback searchCallback,
 +            int targetIndexVersion, ILSMHarness lsmHarness, ITreeIndexFrameFactory btreeInteriorFrameFactory,
 +            ITreeIndexFrameFactory btreeLeafFrameFactory, ITreeIndexFrameFactory buddyBtreeLeafFrameFactory) {
 +        this.componentHolder = new LinkedList<ILSMComponent>();
 +        this.componentsToBeMerged = new LinkedList<ILSMComponent>();
 +        this.componentsToBeReplicated = new LinkedList<ILSMComponent>();
 +        this.searchCallback = searchCallback;
 +        this.targetIndexVersion = targetIndexVersion;
 +        this.bTreeCmp = MultiComparator.create(btreeCmpFactories);
 +        this.buddyBTreeCmp = MultiComparator.create(buddyBtreeCmpFactories);
 +        searchInitialState = new LSMBTreeWithBuddyCursorInitialState(btreeInteriorFrameFactory, btreeLeafFrameFactory,
 +                buddyBtreeLeafFrameFactory, lsmHarness, MultiComparator.create(btreeCmpFactories),
 +                MultiComparator.create(buddyBtreeCmpFactories), NoOpOperationCallback.INSTANCE, null);
 +    }
 +
 +    @Override
 +    public void setOperation(IndexOperation newOp) {
 +        reset();
 +        this.op = newOp;
 +    }
 +
 +    @Override
 +    public void setCurrentMutableComponentId(int currentMutableComponentId) {
 +        // Do nothing. this should never be called for disk only indexes
 +    }
 +
 +    @Override
 +    public void reset() {
++        super.reset();
 +        componentHolder.clear();
 +        componentsToBeMerged.clear();
 +        componentsToBeReplicated.clear();
 +    }
 +
 +    @Override
 +    public IndexOperation getOperation() {
 +        return op;
 +    }
 +
 +    public MultiComparator getBTreeMultiComparator() {
 +        return bTreeCmp;
 +    }
 +
 +    public MultiComparator getBuddyBTreeMultiComparator() {
 +        return buddyBTreeCmp;
 +    }
 +
 +    @Override
 +    public List<ILSMComponent> getComponentHolder() {
 +        return componentHolder;
 +    }
 +
 +    @Override
 +    public ISearchOperationCallback getSearchOperationCallback() {
 +        return searchCallback;
 +    }
 +
 +    // This should never be needed for disk only indexes
 +    @Override
 +    public IModificationOperationCallback getModificationCallback() {
 +        return null;
 +    }
 +
 +    @Override
 +    public List<ILSMComponent> getComponentsToBeMerged() {
 +        return componentsToBeMerged;
 +    }
 +
 +    public int getTargetIndexVersion() {
 +        return targetIndexVersion;
 +    }
 +
 +    @Override
 +    public void setSearchPredicate(ISearchPredicate searchPredicate) {
 +        this.searchPredicate = searchPredicate;
 +    }
 +
 +    @Override
 +    public ISearchPredicate getSearchPredicate() {
 +        return searchPredicate;
 +    }
 +
 +    @Override
 +    public List<ILSMComponent> getComponentsToBeReplicated() {
 +        return componentsToBeReplicated;
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e928b6ac/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java
----------------------------------------------------------------------
diff --cc hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java
index e833283,0000000..31c9d40
mode 100644,000000..100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java
@@@ -1,218 -1,0 +1,219 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +
 +package org.apache.hyracks.storage.am.lsm.btree.impls;
 +
 +import java.util.LinkedList;
 +import java.util.List;
 +
 +import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 +import org.apache.hyracks.storage.am.btree.api.IBTreeLeafFrame;
 +import org.apache.hyracks.storage.am.btree.impls.BTree;
 +import org.apache.hyracks.storage.am.btree.impls.BTreeOpContext;
 +import org.apache.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
 +import org.apache.hyracks.storage.am.btree.impls.RangePredicate;
 +import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
 +import org.apache.hyracks.storage.am.common.api.ISearchOperationCallback;
 +import org.apache.hyracks.storage.am.common.api.ISearchPredicate;
 +import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
 +import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
 +import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 +import org.apache.hyracks.storage.am.common.ophelpers.MultiComparator;
 +import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference;
 +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
 +import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
- import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
++import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexOperationContext;
 +
- public final class LSMBTreeOpContext implements ILSMIndexOperationContext {
++public final class LSMBTreeOpContext extends AbstractLSMIndexOperationContext {
 +
 +    public ITreeIndexFrameFactory insertLeafFrameFactory;
 +    public ITreeIndexFrameFactory deleteLeafFrameFactory;
 +    public IBTreeLeafFrame insertLeafFrame;
 +    public IBTreeLeafFrame deleteLeafFrame;
 +    public final BTree[] mutableBTrees;
 +    public BTree.BTreeAccessor[] mutableBTreeAccessors;
 +    public BTreeOpContext[] mutableBTreeOpCtxs;
 +    public BTree.BTreeAccessor currentMutableBTreeAccessor;
 +    public BTreeOpContext currentMutableBTreeOpCtx;
 +    public IndexOperation op;
 +    public final MultiComparator cmp;
 +    public final MultiComparator bloomFilterCmp;
 +    public IModificationOperationCallback modificationCallback;
 +    public ISearchOperationCallback searchCallback;
 +    private final List<ILSMComponent> componentHolder;
 +    private final List<ILSMComponent> componentsToBeMerged;
 +    private final List<ILSMComponent> componentsToBeReplicated;
 +    public final PermutingTupleReference indexTuple;
 +    public final MultiComparator filterCmp;
 +    public final PermutingTupleReference filterTuple;
 +    public ISearchPredicate searchPredicate;
 +    public BTreeRangeSearchCursor memCursor;
 +    public LSMBTreeCursorInitialState searchInitialState;
 +    public LSMBTreePointSearchCursor insertSearchCursor;
 +
 +    public LSMBTreeOpContext(List<ILSMComponent> mutableComponents, ITreeIndexFrameFactory insertLeafFrameFactory,
 +            ITreeIndexFrameFactory deleteLeafFrameFactory, IModificationOperationCallback modificationCallback,
 +            ISearchOperationCallback searchCallback, int numBloomFilterKeyFields, int[] btreeFields, int[] filterFields,
 +            ILSMHarness lsmHarness) {
 +        LSMBTreeMemoryComponent c = (LSMBTreeMemoryComponent) mutableComponents.get(0);
 +        IBinaryComparatorFactory cmpFactories[] = c.getBTree().getComparatorFactories();
 +        if (cmpFactories[0] != null) {
 +            this.cmp = MultiComparator.create(c.getBTree().getComparatorFactories());
 +        } else {
 +            this.cmp = null;
 +        }
 +
 +        bloomFilterCmp = MultiComparator.create(c.getBTree().getComparatorFactories(), 0, numBloomFilterKeyFields);
 +
 +        mutableBTrees = new BTree[mutableComponents.size()];
 +        mutableBTreeAccessors = new BTree.BTreeAccessor[mutableComponents.size()];
 +        mutableBTreeOpCtxs = new BTreeOpContext[mutableComponents.size()];
 +        for (int i = 0; i < mutableComponents.size(); i++) {
 +            LSMBTreeMemoryComponent mutableComponent = (LSMBTreeMemoryComponent) mutableComponents.get(i);
 +            mutableBTrees[i] = mutableComponent.getBTree();
 +            mutableBTreeAccessors[i] = (BTree.BTreeAccessor) mutableBTrees[i].createAccessor(modificationCallback,
 +                    NoOpOperationCallback.INSTANCE);
 +            mutableBTreeOpCtxs[i] = mutableBTreeAccessors[i].getOpContext();
 +        }
 +
 +        this.insertLeafFrameFactory = insertLeafFrameFactory;
 +        this.deleteLeafFrameFactory = deleteLeafFrameFactory;
 +        this.insertLeafFrame = (IBTreeLeafFrame) insertLeafFrameFactory.createFrame();
 +        this.deleteLeafFrame = (IBTreeLeafFrame) deleteLeafFrameFactory.createFrame();
 +        if (insertLeafFrame != null && this.cmp != null) {
 +            insertLeafFrame.setMultiComparator(cmp);
 +        }
 +        if (deleteLeafFrame != null && this.cmp != null) {
 +            deleteLeafFrame.setMultiComparator(cmp);
 +        }
 +        this.componentHolder = new LinkedList<ILSMComponent>();
 +        this.componentsToBeMerged = new LinkedList<ILSMComponent>();
 +        this.componentsToBeReplicated = new LinkedList<ILSMComponent>();
 +        this.modificationCallback = modificationCallback;
 +        this.searchCallback = searchCallback;
 +
 +        if (filterFields != null) {
 +            indexTuple = new PermutingTupleReference(btreeFields);
 +            filterCmp = MultiComparator.create(c.getLSMComponentFilter().getFilterCmpFactories());
 +            filterTuple = new PermutingTupleReference(filterFields);
 +        } else {
 +            indexTuple = null;
 +            filterCmp = null;
 +            filterTuple = null;
 +        }
 +        searchPredicate = new RangePredicate(null, null, true, true, cmp, cmp);
 +        if (insertLeafFrame != null) {
 +            memCursor = new BTreeRangeSearchCursor(insertLeafFrame, false);
 +        }
 +
 +        searchInitialState = new LSMBTreeCursorInitialState(insertLeafFrameFactory, cmp, bloomFilterCmp, lsmHarness,
 +                null, searchCallback, null);
 +        insertSearchCursor = new LSMBTreePointSearchCursor(this);
 +    }
 +
 +    @Override
 +    public void setOperation(IndexOperation newOp) {
 +        reset();
 +        this.op = newOp;
 +    }
 +
 +    public void setInsertMode() {
 +        currentMutableBTreeOpCtx.leafFrame = insertLeafFrame;
 +        currentMutableBTreeOpCtx.leafFrameFactory = insertLeafFrameFactory;
 +    }
 +
 +    public void setDeleteMode() {
 +        currentMutableBTreeOpCtx.leafFrame = deleteLeafFrame;
 +        currentMutableBTreeOpCtx.leafFrameFactory = deleteLeafFrameFactory;
 +    }
 +
 +    @Override
 +    public void reset() {
++        super.reset();
 +        componentHolder.clear();
 +        componentsToBeMerged.clear();
 +        componentsToBeReplicated.clear();
 +    }
 +
 +    @Override
 +    public IndexOperation getOperation() {
 +        return op;
 +    }
 +
 +    @Override
 +    public List<ILSMComponent> getComponentHolder() {
 +        return componentHolder;
 +    }
 +
 +    @Override
 +    public ISearchOperationCallback getSearchOperationCallback() {
 +        return searchCallback;
 +    }
 +
 +    @Override
 +    public IModificationOperationCallback getModificationCallback() {
 +        return modificationCallback;
 +    }
 +
 +    @Override
 +    public void setCurrentMutableComponentId(int currentMutableComponentId) {
 +        currentMutableBTreeAccessor = mutableBTreeAccessors[currentMutableComponentId];
 +        currentMutableBTreeOpCtx = mutableBTreeOpCtxs[currentMutableComponentId];
 +        switch (op) {
 +            case SEARCH:
 +                break;
 +            case DISKORDERSCAN:
 +            case UPDATE:
 +                // Attention: It is important to leave the leafFrame and
 +                // leafFrameFactory of the mutableBTree as is when doing an update.
 +                // Update will only be set if a previous attempt to delete or
 +                // insert failed, so we must preserve the semantics of the
 +                // previously requested operation.
 +                break;
 +            case UPSERT:
 +            case INSERT:
 +                setInsertMode();
 +                break;
 +            case PHYSICALDELETE:
 +            case DELETE:
 +                setDeleteMode();
 +                break;
 +        }
 +    }
 +
 +    @Override
 +    public List<ILSMComponent> getComponentsToBeMerged() {
 +        return componentsToBeMerged;
 +    }
 +
 +    @Override
 +    public void setSearchPredicate(ISearchPredicate searchPredicate) {
 +        this.searchPredicate = searchPredicate;
 +    }
 +
 +    @Override
 +    public ISearchPredicate getSearchPredicate() {
 +        return searchPredicate;
 +    }
 +
 +    @Override
 +    public List<ILSMComponent> getComponentsToBeReplicated() {
 +        return componentsToBeReplicated;
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e928b6ac/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
----------------------------------------------------------------------
diff --cc hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
index c1cef2d,0000000..11b933d
mode 100644,000000..100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
@@@ -1,54 -1,0 +1,60 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +
 +package org.apache.hyracks.storage.am.lsm.common.api;
 +
 +import java.util.List;
 +
 +import org.apache.hyracks.api.exceptions.HyracksDataException;
 +import org.apache.hyracks.storage.am.common.api.IIndex;
 +import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
 +import org.apache.hyracks.storage.am.common.api.ISearchOperationCallback;
 +import org.apache.hyracks.storage.am.lsm.common.impls.LSMHarness;
 +
 +/**
 + * Methods to be implemented by an LSM index, which are called from {@link LSMHarness}.
 + * The implementations of the methods below should be thread agnostic.
 + * Synchronization of LSM operations like updates/searches/flushes/merges are
 + * done by the {@link LSMHarness}. For example, a flush() implementation should only
 + * create and return the new on-disk component, ignoring the fact that
 + * concurrent searches/updates/merges may be ongoing.
 + */
 +public interface ILSMIndex extends IIndex {
 +
 +    public void deactivate(boolean flushOnExit) throws HyracksDataException;
 +
++    @Override
 +    public ILSMIndexAccessor createAccessor(IModificationOperationCallback modificationCallback,
 +            ISearchOperationCallback searchCallback) throws HyracksDataException;
 +
 +    public ILSMOperationTracker getOperationTracker();
 +
 +    public ILSMIOOperationScheduler getIOScheduler();
 +
 +    public ILSMIOOperationCallback getIOOperationCallback();
 +
 +    public List<ILSMComponent> getImmutableComponents();
 +
 +    public boolean isPrimaryIndex();
++
++    /**
++     * @return true if the index is durable. Otherwise false.
++     */
++    public boolean isDurable();
 +}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e928b6ac/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java
----------------------------------------------------------------------
diff --cc hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java
index 99f981d,0000000..acf2233
mode 100644,000000..100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java
@@@ -1,44 -1,0 +1,51 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.hyracks.storage.am.lsm.common.api;
 +
 +import java.util.List;
 +
 +import org.apache.hyracks.storage.am.common.api.IIndexOperationContext;
 +import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
 +import org.apache.hyracks.storage.am.common.api.ISearchOperationCallback;
 +import org.apache.hyracks.storage.am.common.api.ISearchPredicate;
 +
 +public interface ILSMIndexOperationContext extends IIndexOperationContext {
 +    public List<ILSMComponent> getComponentHolder();
 +
 +    public List<ILSMComponent> getComponentsToBeMerged();
 +
 +    public ISearchOperationCallback getSearchOperationCallback();
 +
 +    public IModificationOperationCallback getModificationCallback();
 +
 +    public void setCurrentMutableComponentId(int currentMutableComponentId);
 +
 +    public void setSearchPredicate(ISearchPredicate searchPredicate);
 +
 +    public ISearchPredicate getSearchPredicate();
 +
 +    public List<ILSMComponent> getComponentsToBeReplicated();
++
++    /**
++     * @return true if this operation entered the components. Otherwise false.
++     */
++    public boolean isAccessingComponents();
++
++    public void setAccessingComponents(boolean accessingComponents);
 +}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e928b6ac/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
----------------------------------------------------------------------
diff --cc hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
index 441dda1,0000000..440ad31
mode 100644,000000..100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
@@@ -1,296 -1,0 +1,299 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +
 +package org.apache.hyracks.storage.am.lsm.common.impls;
 +
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.HashSet;
 +import java.util.LinkedList;
 +import java.util.List;
 +import java.util.Set;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +import java.util.concurrent.atomic.AtomicInteger;
 +
 +import org.apache.hyracks.api.exceptions.HyracksDataException;
 +import org.apache.hyracks.api.replication.IReplicationJob.ReplicationExecutionType;
 +import org.apache.hyracks.api.replication.IReplicationJob.ReplicationOperation;
 +import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilter;
 +import org.apache.hyracks.storage.am.common.api.ITreeIndex;
- import org.apache.hyracks.storage.am.common.api.ITreeIndexMetaDataFrame;
 +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
 +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState;
 +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterFrameFactory;
 +import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
 +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
 +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexFileManager;
 +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexInternal;
 +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
 +import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
 +import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
 +import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
 +import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
 +import org.apache.hyracks.storage.common.buffercache.IBufferCache;
- import org.apache.hyracks.storage.common.buffercache.ICachedPage;
- import org.apache.hyracks.storage.common.file.BufferedFileHandle;
 +import org.apache.hyracks.storage.common.file.IFileMapProvider;
 +
 +public abstract class AbstractLSMIndex implements ILSMIndexInternal {
 +    protected final ILSMHarness lsmHarness;
 +
 +    protected final ILSMIOOperationScheduler ioScheduler;
 +    protected final ILSMIOOperationCallback ioOpCallback;
 +
 +    // In-memory components.
 +    protected final List<ILSMComponent> memoryComponents;
 +    protected final List<IVirtualBufferCache> virtualBufferCaches;
 +    protected AtomicInteger currentMutableComponentId;
 +
 +    // On-disk components.
 +    protected final IBufferCache diskBufferCache;
 +    protected final ILSMIndexFileManager fileManager;
 +    protected final IFileMapProvider diskFileMapProvider;
 +    protected final List<ILSMComponent> diskComponents;
 +    protected final List<ILSMComponent> inactiveDiskComponents;
 +    protected final double bloomFilterFalsePositiveRate;
 +    protected final ILSMComponentFilterFrameFactory filterFrameFactory;
 +    protected final LSMComponentFilterManager filterManager;
 +    protected final int[] filterFields;
 +    protected final boolean durable;
 +
 +    protected boolean isActivated;
 +    protected final AtomicBoolean[] flushRequests;
 +    protected boolean memoryComponentsAllocated = false;
 +
 +    public AbstractLSMIndex(List<IVirtualBufferCache> virtualBufferCaches, IBufferCache diskBufferCache,
-             ILSMIndexFileManager fileManager, IFileMapProvider diskFileMapProvider,
-             double bloomFilterFalsePositiveRate, ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker,
-             ILSMIOOperationScheduler ioScheduler, ILSMIOOperationCallback ioOpCallback,
-             ILSMComponentFilterFrameFactory filterFrameFactory, LSMComponentFilterManager filterManager,
-             int[] filterFields, boolean durable) {
++            ILSMIndexFileManager fileManager, IFileMapProvider diskFileMapProvider, double bloomFilterFalsePositiveRate,
++            ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, ILSMIOOperationScheduler ioScheduler,
++            ILSMIOOperationCallback ioOpCallback, ILSMComponentFilterFrameFactory filterFrameFactory,
++            LSMComponentFilterManager filterManager, int[] filterFields, boolean durable) {
 +        this.virtualBufferCaches = virtualBufferCaches;
 +        this.diskBufferCache = diskBufferCache;
 +        this.diskFileMapProvider = diskFileMapProvider;
 +        this.fileManager = fileManager;
 +        this.bloomFilterFalsePositiveRate = bloomFilterFalsePositiveRate;
 +        this.ioScheduler = ioScheduler;
 +        this.ioOpCallback = ioOpCallback;
 +        this.ioOpCallback.setNumOfMutableComponents(virtualBufferCaches.size());
 +        this.filterFrameFactory = filterFrameFactory;
 +        this.filterManager = filterManager;
 +        this.filterFields = filterFields;
 +        this.inactiveDiskComponents = new LinkedList<ILSMComponent>();
 +        this.durable = durable;
 +        lsmHarness = new LSMHarness(this, mergePolicy, opTracker, diskBufferCache.isReplicationEnabled());
 +        isActivated = false;
 +        diskComponents = new ArrayList<ILSMComponent>();
 +        memoryComponents = new ArrayList<ILSMComponent>();
 +        currentMutableComponentId = new AtomicInteger();
 +        flushRequests = new AtomicBoolean[virtualBufferCaches.size()];
 +        for (int i = 0; i < virtualBufferCaches.size(); i++) {
 +            flushRequests[i] = new AtomicBoolean();
 +        }
 +    }
 +
 +    // The constructor used by external indexes
 +    public AbstractLSMIndex(IBufferCache diskBufferCache, ILSMIndexFileManager fileManager,
 +            IFileMapProvider diskFileMapProvider, double bloomFilterFalsePositiveRate, ILSMMergePolicy mergePolicy,
 +            ILSMOperationTracker opTracker, ILSMIOOperationScheduler ioScheduler, ILSMIOOperationCallback ioOpCallback,
 +            boolean durable) {
 +        this.diskBufferCache = diskBufferCache;
 +        this.diskFileMapProvider = diskFileMapProvider;
 +        this.fileManager = fileManager;
 +        this.bloomFilterFalsePositiveRate = bloomFilterFalsePositiveRate;
 +        this.ioScheduler = ioScheduler;
 +        this.ioOpCallback = ioOpCallback;
 +        this.durable = durable;
 +        lsmHarness = new ExternalIndexHarness(this, mergePolicy, opTracker, diskBufferCache.isReplicationEnabled());
 +        isActivated = false;
 +        diskComponents = new LinkedList<ILSMComponent>();
 +        this.inactiveDiskComponents = new LinkedList<ILSMComponent>();
 +        // Memory related objects are nulled
 +        this.virtualBufferCaches = null;
 +        memoryComponents = null;
 +        currentMutableComponentId = null;
 +        flushRequests = null;
 +        filterFrameFactory = null;
 +        filterManager = null;
 +        filterFields = null;
 +    }
 +
 +    protected void markAsValidInternal(ITreeIndex treeIndex) throws HyracksDataException {
 +        int fileId = treeIndex.getFileId();
 +        IBufferCache bufferCache = treeIndex.getBufferCache();
 +        treeIndex.getMetaManager().close();
 +        // WARNING: flushing the metadata page should be done after releasing the write latch; otherwise, the page
 +        // won't be flushed to disk because it won't be dirty until the write latch has been released.
 +        // Force modified metadata page to disk.
 +        // If the index is not durable, then the flush is not necessary.
 +        if (durable) {
 +            bufferCache.force(fileId, true);
 +        }
 +    }
 +
 +    protected void markAsValidInternal(IBufferCache bufferCache, BloomFilter filter) throws HyracksDataException {
-         if(durable){
-             bufferCache.force(filter.getFileId(),true);
++        if (durable) {
++            bufferCache.force(filter.getFileId(), true);
 +        }
 +    }
 +
 +    @Override
 +    public void addComponent(ILSMComponent c) throws HyracksDataException {
 +        diskComponents.add(0, c);
 +    }
 +
 +    @Override
 +    public void subsumeMergedComponents(ILSMComponent newComponent, List<ILSMComponent> mergedComponents)
 +            throws HyracksDataException {
 +        int swapIndex = diskComponents.indexOf(mergedComponents.get(0));
 +        diskComponents.removeAll(mergedComponents);
 +        diskComponents.add(swapIndex, newComponent);
 +    }
 +
 +    @Override
 +    public void changeMutableComponent() {
 +        currentMutableComponentId.set((currentMutableComponentId.get() + 1) % memoryComponents.size());
 +        ((AbstractMemoryLSMComponent) memoryComponents.get(currentMutableComponentId.get())).setActive();
 +    }
 +
 +    @Override
 +    public List<ILSMComponent> getImmutableComponents() {
 +        return diskComponents;
 +    }
 +
 +    @Override
 +    public void changeFlushStatusForCurrentMutableCompoent(boolean needsFlush) {
 +        flushRequests[currentMutableComponentId.get()].set(needsFlush);
 +    }
 +
 +    @Override
 +    public boolean hasFlushRequestForCurrentMutableComponent() {
 +        return flushRequests[currentMutableComponentId.get()].get();
 +    }
 +
 +    @Override
 +    public ILSMOperationTracker getOperationTracker() {
 +        return lsmHarness.getOperationTracker();
 +    }
 +
 +    @Override
 +    public ILSMIOOperationScheduler getIOScheduler() {
 +        return ioScheduler;
 +    }
 +
 +    @Override
 +    public ILSMIOOperationCallback getIOOperationCallback() {
 +        return ioOpCallback;
 +    }
 +
 +    @Override
 +    public IBufferCache getBufferCache() {
 +        return diskBufferCache;
 +    }
 +
-     public boolean isEmptyIndex() throws HyracksDataException {
++    public boolean isEmptyIndex() {
 +        boolean isModified = false;
 +        for (ILSMComponent c : memoryComponents) {
 +            AbstractMemoryLSMComponent mutableComponent = (AbstractMemoryLSMComponent) c;
 +            if (mutableComponent.isModified()) {
 +                isModified = true;
 +                break;
 +            }
 +        }
 +        return diskComponents.isEmpty() && !isModified;
 +    }
 +
 +    @Override
 +    public String toString() {
 +        return "LSMIndex [" + fileManager.getBaseDir() + "]";
 +    }
 +
 +    @Override
 +    public boolean hasMemoryComponents() {
 +        return true;
 +    }
 +
 +    @Override
 +    public boolean isCurrentMutableComponentEmpty() throws HyracksDataException {
 +        //check if the current memory component has been modified
 +        return !((AbstractMemoryLSMComponent) memoryComponents.get(currentMutableComponentId.get())).isModified();
 +    }
 +
 +    public void setCurrentMutableComponentState(ComponentState componentState) {
 +        ((AbstractMemoryLSMComponent) memoryComponents.get(currentMutableComponentId.get())).setState(componentState);
 +    }
 +
 +    public ComponentState getCurrentMutableComponentState() {
 +        return ((AbstractMemoryLSMComponent) memoryComponents.get(currentMutableComponentId.get())).getState();
 +    }
 +
 +    public int getCurrentMutableComponentWriterCount() {
 +        return ((AbstractMemoryLSMComponent) memoryComponents.get(currentMutableComponentId.get())).getWriterCount();
 +    }
 +
 +    @Override
 +    public List<ILSMComponent> getInactiveDiskComponents() {
 +        return inactiveDiskComponents;
 +    }
 +
 +    @Override
 +    public void addInactiveDiskComponent(ILSMComponent diskComponent) {
 +        inactiveDiskComponents.add(diskComponent);
 +    }
 +
 +    public abstract Set<String> getLSMComponentPhysicalFiles(ILSMComponent newComponent);
 +
 +    @Override
 +    public void scheduleReplication(ILSMIndexOperationContext ctx, List<ILSMComponent> lsmComponents, boolean bulkload,
 +            ReplicationOperation operation, LSMOperationType opType) throws HyracksDataException {
 +        //get set of files to be replicated for this component
 +        Set<String> componentFiles = new HashSet<String>();
 +
 +        //get set of files to be replicated for each component
 +        for (ILSMComponent lsmComponent : lsmComponents) {
 +            componentFiles.addAll(getLSMComponentPhysicalFiles(lsmComponent));
 +        }
 +
 +        ReplicationExecutionType executionType;
 +        if (bulkload) {
 +            executionType = ReplicationExecutionType.SYNC;
 +        } else {
 +            executionType = ReplicationExecutionType.ASYNC;
 +        }
 +
 +        //create replication job and submit it
 +        LSMIndexReplicationJob job = new LSMIndexReplicationJob(this, ctx, componentFiles, operation, executionType,
 +                opType);
 +        try {
 +            diskBufferCache.getIOReplicationManager().submitJob(job);
 +        } catch (IOException e) {
 +            throw new HyracksDataException(e);
 +        }
 +    }
 +
++    @Override
 +    public abstract void allocateMemoryComponents() throws HyracksDataException;
 +
++    @Override
 +    public boolean isMemoryComponentsAllocated() {
 +        return memoryComponentsAllocated;
 +    }
++
++    @Override
++    public boolean isDurable() {
++        return durable;
++    }
 +}