You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2014/06/16 23:28:40 UTC

[3/4] git commit: Merge branch '1.5.2-SNAPSHOT' into 1.6.1-SNAPSHOT

Merge branch '1.5.2-SNAPSHOT' into 1.6.1-SNAPSHOT

Conflicts:
	server/base/src/main/java/org/apache/accumulo/server/tabletserver/LargestFirstMemoryManager.java


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/dd422b96
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/dd422b96
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/dd422b96

Branch: refs/heads/master
Commit: dd422b963c1c372f73114fbbb0f36f954bd91a5f
Parents: b604e1d a61a795
Author: Eric C. Newton <er...@gmail.com>
Authored: Mon Jun 16 16:49:28 2014 -0400
Committer: Eric C. Newton <er...@gmail.com>
Committed: Mon Jun 16 16:49:28 2014 -0400

----------------------------------------------------------------------
 .../tabletserver/LargestFirstMemoryManager.java | 170 +++++++++++--------
 1 file changed, 102 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/dd422b96/server/base/src/main/java/org/apache/accumulo/server/tabletserver/LargestFirstMemoryManager.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/tabletserver/LargestFirstMemoryManager.java
index dd1a6ef,0000000..b891ad6
mode 100644,000000..100644
--- a/server/base/src/main/java/org/apache/accumulo/server/tabletserver/LargestFirstMemoryManager.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/tabletserver/LargestFirstMemoryManager.java
@@@ -1,200 -1,0 +1,234 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.server.tabletserver;
 +
 +import java.util.ArrayList;
 +import java.util.HashMap;
 +import java.util.List;
++import java.util.Map.Entry;
++import java.util.TreeMap;
 +
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.KeyExtent;
 +import org.apache.accumulo.server.conf.ServerConfiguration;
 +import org.apache.hadoop.io.Text;
 +import org.apache.log4j.Logger;
 +
++/**
++ * The LargestFirstMemoryManager attempts to keep memory between 80% and 90% full. It adapts over time the point at which it should start a compaction based on
++ * how full memory gets between successive calls. It will also flush idle tablets based on a per-table configurable idle time. It will only attempt to flush
++ * tablets up to 20% of all memory. And, as the name of the class would suggest, it flushes the tablet with the highest memory footprint. However, it actually
++ * chooses the tablet as a function of its size doubled for every 15 minutes of idle time. 
++ */
 +public class LargestFirstMemoryManager implements MemoryManager {
 +  
 +  private static final Logger log = Logger.getLogger(LargestFirstMemoryManager.class);
++  private static final long ZERO_TIME = System.currentTimeMillis();
 +  private static final int TSERV_MINC_MAXCONCURRENT_NUMWAITING_MULTIPLIER = 2;
++  private static final double MAX_FLUSH_AT_ONCE_PERCENT = 0.20;
 +  
 +  private long maxMemory = -1;
 +  private int maxConcurrentMincs;
 +  private int numWaitingMultiplier;
 +  private long prevIngestMemory;
++  // The fraction of memory that needs to be used before we begin flushing.
 +  private double compactionThreshold;
 +  private long maxObserved;
-   private HashMap<Text,Long> mincIdleThresholds;
-   private static final long zerotime = System.currentTimeMillis();
++  private final HashMap<Text,Long> mincIdleThresholds = new HashMap<Text,Long>();
 +  private ServerConfiguration config = null;
 +  
++  private static class TabletInfo {
++    final KeyExtent extent;
++    final long memTableSize;
++    final long idleTime;
++    final long load;
++    
++    public TabletInfo(KeyExtent extent, long memTableSize, long idleTime, long load) {
++      this.extent = extent;
++      this.memTableSize = memTableSize;
++      this.idleTime = idleTime;
++      this.load = load;
++    }
++  }
++  
++  // A little map that will hold the "largest" N tablets, where largest is a result of the timeMemoryLoad function
++  @SuppressWarnings("serial")
++  private static class LargestMap extends TreeMap<Long,TabletInfo> {
++    final int max;
++    
++    LargestMap(int n) {
++      max = n;
++    }
++    
++    @Override
++    public TabletInfo put(Long key, TabletInfo value) {
++      if (size() == max) {
++        if (key.compareTo(this.firstKey()) < 0)
++          return value;
++        try {
++          return super.put(key, value);
++        } finally {
++          super.remove(this.firstKey());
++        }
++      } else {
++        return super.put(key, value);
++      }
++    }
++  }
++  
 +  LargestFirstMemoryManager(long maxMemory, int maxConcurrentMincs, int numWaitingMultiplier) {
 +    this();
 +    this.maxMemory = maxMemory;
 +    this.maxConcurrentMincs = maxConcurrentMincs;
 +    this.numWaitingMultiplier = numWaitingMultiplier;
 +  }
 +  
 +  @Override
 +  public void init(ServerConfiguration conf) {
 +    this.config = conf;
 +    maxMemory = conf.getConfiguration().getMemoryInBytes(Property.TSERV_MAXMEM);
 +    maxConcurrentMincs = conf.getConfiguration().getCount(Property.TSERV_MINC_MAXCONCURRENT);
 +    numWaitingMultiplier = TSERV_MINC_MAXCONCURRENT_NUMWAITING_MULTIPLIER;
 +  }
 +  
 +  public LargestFirstMemoryManager() {
 +    prevIngestMemory = 0;
 +    compactionThreshold = 0.5;
 +    maxObserved = 0;
-     mincIdleThresholds = new HashMap<Text,Long>();
++  }
++  
++  private long getMinCIdleThreshold(KeyExtent extent) {
++    Text tableId = extent.getTableId();
++    if (!mincIdleThresholds.containsKey(tableId))
++      mincIdleThresholds.put(tableId, config.getTableConfiguration(tableId.toString()).getTimeInMillis(Property.TABLE_MINC_COMPACT_IDLETIME));
++    return mincIdleThresholds.get(tableId);
 +  }
 +  
 +  @Override
 +  public MemoryManagementActions getMemoryManagementActions(List<TabletState> tablets) {
 +    if (maxMemory < 0)
-       throw new IllegalStateException("need to initialize Largst");
++      throw new IllegalStateException("need to initialize " + LargestFirstMemoryManager.class.getName());
++    
++    final int maxMinCs = maxConcurrentMincs * numWaitingMultiplier;
++    
 +    mincIdleThresholds.clear();
++    final MemoryManagementActions result = new MemoryManagementActions();
++    result.tabletsToMinorCompact = new ArrayList<KeyExtent>();
++    
++    TreeMap<Long,TabletInfo> largestMemTablets = new LargestMap(maxMinCs);
++    final TreeMap<Long,TabletInfo> largestIdleMemTablets = new LargestMap(maxConcurrentMincs);
++    final long now = System.currentTimeMillis();
++    
 +    long ingestMemory = 0;
 +    long compactionMemory = 0;
-     KeyExtent largestMemTablet = null;
-     long largestMemTableLoad = 0;
-     KeyExtent largestIdleMemTablet = null;
-     long largestIdleMemTableLoad = 0;
-     long mts;
-     long mcmts;
 +    int numWaitingMincs = 0;
-     long idleTime;
-     long tml;
-     long ct = System.currentTimeMillis();
-     
-     long largestMemTableIdleTime = -1, largestMemTableSize = -1;
-     long largestIdleMemTableIdleTime = -1, largestIdleMemTableSize = -1;
 +    
++    // find the largest and most idle tablets
 +    for (TabletState ts : tablets) {
-       mts = ts.getMemTableSize();
-       mcmts = ts.getMinorCompactingMemTableSize();
-       if (ts.getLastCommitTime() > 0)
-         idleTime = ct - ts.getLastCommitTime();
-       else
-         idleTime = ct - zerotime;
-       ingestMemory += mts;
-       tml = timeMemoryLoad(mts, idleTime);
-       if (mcmts == 0 && mts > 0) {
-         if (tml > largestMemTableLoad) {
-           largestMemTableLoad = tml;
-           largestMemTablet = ts.getExtent();
-           largestMemTableSize = mts;
-           largestMemTableIdleTime = idleTime;
-         }
-         Text tableId = ts.getExtent().getTableId();
-         if (!mincIdleThresholds.containsKey(tableId))
-           mincIdleThresholds.put(tableId, config.getTableConfiguration(tableId.toString()).getTimeInMillis(Property.TABLE_MINC_COMPACT_IDLETIME));
-         if (idleTime > mincIdleThresholds.get(tableId) && tml > largestIdleMemTableLoad) {
-           largestIdleMemTableLoad = tml;
-           largestIdleMemTablet = ts.getExtent();
-           largestIdleMemTableSize = mts;
-           largestIdleMemTableIdleTime = idleTime;
++      final long memTabletSize = ts.getMemTableSize();
++      final long minorCompactingSize = ts.getMinorCompactingMemTableSize();
++      final long idleTime = now - Math.max(ts.getLastCommitTime(), ZERO_TIME);
++      final long timeMemoryLoad = timeMemoryLoad(memTabletSize, idleTime);
++      ingestMemory += memTabletSize;
++      if (minorCompactingSize == 0 && memTabletSize > 0) {
++        TabletInfo tabletInfo = new TabletInfo(ts.getExtent(), memTabletSize, idleTime, timeMemoryLoad);
++        largestMemTablets.put(timeMemoryLoad, tabletInfo);
++        if (idleTime > getMinCIdleThreshold(ts.getExtent())) {
++          largestIdleMemTablets.put(timeMemoryLoad, tabletInfo);
 +        }
-         // log.debug("extent: "+ts.getExtent()+" idle threshold: "+mincIdleThresholds.get(tableId)+" idle time: "+idleTime+" memtable: "+mts+" compacting: "+mcmts);
 +      }
-       // else {
-       // log.debug("skipping extent "+ts.getExtent()+", nothing in memory");
-       // }
 +      
-       compactionMemory += mcmts;
-       if (mcmts > 0)
++      compactionMemory += minorCompactingSize;
++      if (minorCompactingSize > 0)
 +        numWaitingMincs++;
 +    }
 +    
 +    if (ingestMemory + compactionMemory > maxObserved) {
 +      maxObserved = ingestMemory + compactionMemory;
 +    }
 +    
-     long memoryChange = ingestMemory - prevIngestMemory;
++    final long memoryChange = ingestMemory - prevIngestMemory;
 +    prevIngestMemory = ingestMemory;
 +    
-     MemoryManagementActions mma = new MemoryManagementActions();
-     mma.tabletsToMinorCompact = new ArrayList<KeyExtent>();
-     
 +    boolean startMinC = false;
 +    
-     if (numWaitingMincs < maxConcurrentMincs * numWaitingMultiplier) {
++    if (numWaitingMincs < maxMinCs) {
 +      // based on previous ingest memory increase, if we think that the next increase will
 +      // take us over the threshold for non-compacting memory, then start a minor compaction
 +      // or if the idle time of the chosen tablet is greater than the threshold, start a minor compaction
 +      if (memoryChange >= 0 && ingestMemory + memoryChange > compactionThreshold * maxMemory) {
 +        startMinC = true;
-       } else if (largestIdleMemTablet != null) {
++      } else if (!largestIdleMemTablets.isEmpty()) {
 +        startMinC = true;
-         // switch largestMemTablet to largestIdleMemTablet
-         largestMemTablet = largestIdleMemTablet;
-         largestMemTableLoad = largestIdleMemTableLoad;
-         largestMemTableSize = largestIdleMemTableSize;
-         largestMemTableIdleTime = largestIdleMemTableIdleTime;
++        // switch largestMemTablets to largestIdleMemTablets
++        largestMemTablets = largestIdleMemTablets;
 +        log.debug("IDLE minor compaction chosen");
 +      }
 +    }
 +    
-     if (startMinC && largestMemTablet != null) {
-       mma.tabletsToMinorCompact.add(largestMemTablet);
-       log.debug(String.format("COMPACTING %s  total = %,d ingestMemory = %,d", largestMemTablet.toString(), (ingestMemory + compactionMemory), ingestMemory));
-       log.debug(String.format("chosenMem = %,d chosenIT = %.2f load %,d", largestMemTableSize, largestMemTableIdleTime / 1000.0, largestMemTableLoad));
++    if (startMinC) {
++      long toBeCompacted = compactionMemory;
++      for (int i = numWaitingMincs; i < maxMinCs && !largestMemTablets.isEmpty(); i++) {
++        Entry<Long,TabletInfo> lastEntry = largestMemTablets.lastEntry();
++        TabletInfo largest = lastEntry.getValue();
++        toBeCompacted += largest.memTableSize;
++        result.tabletsToMinorCompact.add(largest.extent);
++        log.debug(String.format("COMPACTING %s  total = %,d ingestMemory = %,d", largest.extent.toString(), (ingestMemory + compactionMemory), ingestMemory));
++        log.debug(String.format("chosenMem = %,d chosenIT = %.2f load %,d", largest.memTableSize, largest.idleTime / 1000.0, largest.load));
++        largestMemTablets.remove(lastEntry.getKey());
++        if (toBeCompacted > ingestMemory * MAX_FLUSH_AT_ONCE_PERCENT)
++          break;
++      }
 +    } else if (memoryChange < 0) {
 +      // before idle mincs, starting a minor compaction meant that memoryChange >= 0.
 +      // we thought we might want to remove the "else" if that changed,
 +      // however it seems performing idle compactions shouldn't make the threshold
 +      // change more often, so it is staying for now.
 +      // also, now we have the case where memoryChange < 0 due to an idle compaction, yet
 +      // we are still adjusting the threshold. should this be tracked and prevented?
 +      
 +      // memory change < 0 means a minor compaction occurred
 +      // we want to see how full the memory got during the compaction
 +      // (the goal is for it to have between 80% and 90% memory utilization)
 +      // and adjust the compactionThreshold accordingly
 +      
 +      log.debug(String.format("BEFORE compactionThreshold = %.3f maxObserved = %,d", compactionThreshold, maxObserved));
-       
 +      if (compactionThreshold < 0.82 && maxObserved < 0.8 * maxMemory) {
 +        // 0.82 * 1.1 is about 0.9, which is our desired max threshold
 +        compactionThreshold *= 1.1;
 +      } else if (compactionThreshold > 0.056 && maxObserved > 0.9 * maxMemory) {
 +        // 0.056 * 0.9 is about 0.05, which is our desired min threshold
 +        compactionThreshold *= 0.9;
 +      }
 +      maxObserved = 0;
 +      
 +      log.debug(String.format("AFTER compactionThreshold = %.3f", compactionThreshold));
 +    }
 +    
-     return mma;
++    return result;
 +  }
 +  
 +  @Override
 +  public void tabletClosed(KeyExtent extent) {}
 +  
++  // The load function: memory times the idle time, doubling every 15 mins
 +  static long timeMemoryLoad(long mem, long time) {
 +    double minutesIdle = time / 60000.0;
 +    
 +    return (long) (mem * Math.pow(2, minutesIdle / 15.0));
 +  }
-   
-   public static void main(String[] args) {
-     for (int i = 0; i < 62; i++) {
-       System.out.printf("%d\t%d%n", i, timeMemoryLoad(1, i * 60000l));
-     }
-   }
 +}