You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2016/02/22 19:36:08 UTC

[46/83] [abbrv] [partial] incubator-geode git commit: Merge remote-tracking branch 'origin/develop' into feature/GEODE-917

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/SequenceFileHoplog.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/SequenceFileHoplog.java
index 04cbb05,0000000..fbb8f14
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/SequenceFileHoplog.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/SequenceFileHoplog.java
@@@ -1,396 -1,0 +1,396 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
 +  
 +import java.io.Closeable;
 +import java.io.EOFException;
 +import java.io.IOException;
 +import java.nio.ByteBuffer;
 +import java.util.EnumMap;
 +
++import com.gemstone.gemfire.internal.hll.ICardinality;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FSDataOutputStream;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.BytesWritable;
 +import org.apache.hadoop.io.IOUtils;
 +import org.apache.hadoop.io.Text;
 +
 +import com.gemstone.gemfire.cache.hdfs.HDFSIOException;
- import com.gemstone.gemfire.cache.hdfs.internal.cardinality.ICardinality;
 +import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogSetReader.HoplogIterator;
 +import com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile;
 +import com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile.Reader;
 +import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogStatistics;
 +import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 +import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
 +import com.gemstone.gemfire.internal.Version;
 +
 +import org.apache.logging.log4j.Logger;
 +
 +/**
 + * Implements Sequence file based {@link Hoplog}
 + * 
 + * @author hemantb
 + *
 + */
 +public class SequenceFileHoplog extends AbstractHoplog{
 +  
 +   public SequenceFileHoplog(FileSystem inputFS, Path filePath,  
 +      SortedOplogStatistics stats)
 +  throws IOException
 +  {
 +     super(inputFS, filePath, stats);
 +  }
 +  @Override
 +  public void close() throws IOException {
 +    // Nothing to do 
 +  }
 +
 +  @Override
 +  public HoplogReader getReader() throws IOException {
 +    return new SequenceFileReader();
 +  }
 +
 +  @Override
 +  /**
 +   * gets the writer for sequence file. 
 +   * 
 +   * @param keys is not used for SequenceFileHoplog class 
 +   */
 +  public HoplogWriter createWriter(int keys) throws IOException {
 +    return new SequenceFileHoplogWriter();
 +  }
 +
 +  @Override
 +  public boolean isClosed() {
 +    return false;
 +  }
 +  
 +  @Override
 +  public void close(boolean clearCache) throws IOException {
 +    // Nothing to do 
 +  }
 +
 +  /**
 +   * Currently, hsync does not update the file size on namenode. So, if last time the 
 +   * process died after calling hsync but before calling file close, the file is 
 +   * left with an inconsistent file size. This is a workaround that - open the file stream in append 
 +   * mode and close it. This fixes the file size on the namenode.
 +   * 
 +   * @throws IOException
 +   * @return true if the file size was fixed 
 +   */
 +  public boolean fixFileSize() throws IOException {
 +    // Try to fix the file size
 +    // Loop so that the expected expceptions can be ignored 3
 +   // times
 +    if (logger.isDebugEnabled())
 +      logger.debug("{}Fixing size of hoplog " + path, logPrefix);
 +    Exception e = null;
 +    boolean exceptionThrown = false;
 +    for (int i =0; i < 3; i++) {
 +      try {
 +        FSDataOutputStream stream = fsProvider.getFS().append(path);
 +        stream.close();
 +        stream = null;
 +      } catch (IOException ie) {
 +        exceptionThrown = true;
 +        e = ie;
 +        if (logger.isDebugEnabled())
 +        logger.debug("{}Retry run " + (i + 1) + ": Hoplog " + path + " is still a temporary " +
 +            "hoplog because the node managing it wasn't shutdown properly last time. Failed to " +
 +            "fix the hoplog because an exception was thrown " + e, logPrefix );
 +      }
 +      // As either RecoveryInProgressException was thrown or 
 +      // Already being created exception was thrown, wait for 
 +      // sometime before next retry. 
 +      if (exceptionThrown) {
 +        try {
 +          Thread.sleep(5000);
 +        } catch (InterruptedException e1) {
 +        } 
 +        exceptionThrown = false;
 +      } else {
 +        // no exception was thrown, break;
 +        return true;
 +      }
 +    }
 +    logger.info (logPrefix, LocalizedMessage.create(LocalizedStrings.DEBUG, "Hoplog " + path + " is still a temporary " +
 +        "hoplog because the node managing it wasn't shutdown properly last time. Failed to " +
 +        "fix the hoplog because an exception was thrown " + e));
 +    
 +    return false;
 +  }
 +  
 +  @Override
 +  public String toString() {
 +    return "SequenceFileHplog[" + getFileName() + "]";
 +  }
 +  
 +  private class SequenceFileHoplogWriter implements HoplogWriter {
 +    
 +    private SequenceFile.Writer writer = null;
 +    
 +    public SequenceFileHoplogWriter() throws IOException{
 +      writer = AbstractHoplog.getSequenceFileWriter(path, conf, logger);
 +    }
 +   
 +    @Override
 +    public void close() throws IOException {
 +      writer.close();
 +      if (logger.isDebugEnabled())
 +        logger.debug("{}Completed creating hoplog " + path, logPrefix);
 +    }
 +    
 +    @Override
 +    public void hsync() throws IOException {
 +      writer.hsyncWithSizeUpdate();
 +      if (logger.isDebugEnabled())
 +        logger.debug("{}hsync'ed a batch of data to hoplog " + path, logPrefix);
 +    }
 +    
 +    @Override
 +    public void append(byte[] key, byte[] value) throws IOException {
 +      writer.append(new BytesWritable(key), new BytesWritable(value));
 +    }
 +
 +    @Override
 +    public void append(ByteBuffer key, ByteBuffer value) throws IOException {
 +      throw new UnsupportedOperationException("Not supported for Sequence files");
 +    }
 +
 +    @Override
 +    public void close(EnumMap<Meta, byte[]> metadata) throws IOException {
 +      throw new UnsupportedOperationException("Not supported for Sequence files");
 +    }
 +    @Override
 +    public long getCurrentSize() throws IOException {
 +      return writer.getLength();
 +    }
 +    
 +  }
 +  /**
 +   * Sequence file reader. This is currently to be used only by MapReduce jobs and 
 +   * test functions
 +   * 
 +   */
 +  public class SequenceFileReader implements HoplogReader, Closeable {
 +    @Override
 +    public byte[] read(byte[] key) throws IOException {
 +      throw new UnsupportedOperationException("Not supported for Sequence files");
 +    }
 +
 +    @Override
 +    public HoplogIterator<byte[], byte[]> scan()
 +        throws IOException {
 +      return  new SequenceFileIterator(fsProvider.getFS(), path, 0, Long.MAX_VALUE, conf, logger);
 +    }
 +
 +    @Override
 +    public HoplogIterator<byte[], byte[]> scan(
 +        byte[] from, byte[] to) throws IOException {
 +      throw new UnsupportedOperationException("Not supported for Sequence files");
 +    }
 +    
 +    @Override
 +    public HoplogIterator<byte[], byte[]> scan(
 +        long startOffset, long length) throws IOException {
 +      return  new SequenceFileIterator(fsProvider.getFS(), path, startOffset, length, conf, logger);
 +    }
 +    
 +    @Override
 +    public HoplogIterator<byte[], byte[]> scan(
 +        byte[] from, boolean fromInclusive, byte[] to, boolean toInclusive)
 +        throws IOException {
 +      throw new UnsupportedOperationException("Not supported for Sequence files");
 +    }
 +
 +    @Override
 +    public boolean isClosed() {
 +      throw new UnsupportedOperationException("Not supported for Sequence files.");
 +    }
 +    
 +    @Override
 +    public void close() throws IOException {
 +      throw new UnsupportedOperationException("Not supported for Sequence files. Close the iterator instead.");
 +    }
 +
 +    @Override
 +    public ByteBuffer get(byte[] key) throws IOException {
 +      throw new UnsupportedOperationException("Not supported for Sequence files");
 +    }
 +
 +    @Override
 +    public BloomFilter getBloomFilter() throws IOException {
 +      throw new UnsupportedOperationException("Not supported for Sequence files");
 +    }
 +
 +    @Override
 +    public long getEntryCount() {
 +      throw new UnsupportedOperationException("Not supported for Sequence files");
 +    }
 +
 +    @Override
 +    public ICardinality getCardinalityEstimator() {
 +      throw new UnsupportedOperationException("Not supported for Sequence files");
 +    }
 +
 +    @Override
 +    public long sizeEstimate() {
 +      throw new UnsupportedOperationException("Not supported for Sequence files");
 +    }
 +
 +
 +  }
 +  
 +  /**
 +   * Sequence file iterator. This is currently to be used only by MapReduce jobs and 
 +   * test functions
 +   * 
 +   */
 +  public static class SequenceFileIterator implements HoplogIterator<byte[], byte[]> {
 +    
 +    SequenceFile.Reader reader = null;
 +    private BytesWritable prefetchedKey = null;
 +    private BytesWritable prefetchedValue = null;
 +    private byte[] currentKey;
 +    private byte[] currentValue;
 +    boolean hasNext = false;
 +    Logger logger; 
 +    Path path;
 +    private long start;
 +    private long end;
 +    
 +    public SequenceFileIterator(FileSystem fs, Path path, long startOffset, 
 +        long length, Configuration conf, Logger logger) 
 +        throws IOException {
 +      Reader.Option optPath = SequenceFile.Reader.file(path);
 +      
 +      // Hadoop has a configuration parameter io.serializations that is a list of serialization 
 +      // classes which can be used for obtaining serializers and deserializers. This parameter 
 +      // by default contains avro classes. When a sequence file is created, it calls 
 +      // SerializationFactory.getSerializer(keyclass). This internally creates objects using 
 +      // reflection of all the classes that were part of io.serializations. But since, there is 
 +      // no avro class available it throws an exception. 
 +      // Before creating a sequenceFile, override the io.serializations parameter and pass only the classes 
 +      // that are important to us. 
 +      String serializations[] = conf.getStrings("io.serializations",
 +          new String[]{"org.apache.hadoop.io.serializer.WritableSerialization"});
 +      conf.setStrings("io.serializations",
 +          new String[]{"org.apache.hadoop.io.serializer.WritableSerialization"});
 +      // create reader
 +      boolean emptyFile = false;
 +      try {
 +        reader = new SequenceFile.Reader(conf, optPath);
 +      }catch (EOFException e) {
 +        // this is ok as the file has ended. just return false that no more records available
 +        emptyFile = true;
 +      }
 +      // reset the configuration to its original value 
 +      conf.setStrings("io.serializations", serializations);
 +      this.logger = logger;
 +      this.path = path;
 +      
 +      if (emptyFile) {
 +        hasNext = false;
 +      } else {
 +        // The file should be read from the first sync marker after the start position and 
 +        // until the first sync marker after the end position is seen. 
 +        this.end = startOffset + length;
 +        if (startOffset > reader.getPosition()) {
 +          reader.sync(startOffset);                  // sync to start
 +        }
 +        this.start = reader.getPosition();
 +        this.hasNext = this.start < this.end;
 +        if (hasNext)
 +          readNext();
 +      } 
 +    }
 +  
 +
 +    public Version getVersion(){
 +      String version = reader.getMetadata().get(new Text(Meta.GEMFIRE_VERSION.name())).toString();
 +      return Version.fromOrdinalOrCurrent(Short.parseShort(version)); 
 +    }
 +    @Override
 +    public boolean hasNext() {
 +      return hasNext;
 +    }
 +
 +    @Override
 +    public byte[] next() {
 +      currentKey = prefetchedKey.getBytes();
 +      currentValue = prefetchedValue.getBytes();
 +      
 +      readNext();
 +
 +      return currentKey;
 +    }
 +    
 +    private void readNext() {
 +      try {
 +        long pos = reader.getPosition();
 +        prefetchedKey = new BytesWritable();
 +        prefetchedValue = new BytesWritable();
 +        hasNext = reader.next(prefetchedKey, prefetchedValue);
 +        // The file should be read from the first sync marker after the start position and 
 +        // until the first sync marker after the end position is seen. 
 +        if (pos >= end && reader.syncSeen()) {
 +          hasNext = false;
 +        }
 +      } catch (EOFException e) {
 +        // this is ok as the file has ended. just return false that no more records available
 +        hasNext = false;
 +      } 
 +      catch (IOException e) {
 +        hasNext = false;
 +        logger.error(LocalizedMessage.create(LocalizedStrings.HOPLOG_FAILED_TO_READ_HDFS_FILE, path), e);
 +        throw new HDFSIOException(
 +            LocalizedStrings.HOPLOG_FAILED_TO_READ_HDFS_FILE.toLocalizedString(path), e);
 +      }
 +    }
 +    @Override
 +    public void remove() {
 +      throw new UnsupportedOperationException("Not supported for Sequence files");
 +    }
 +
 +    @Override
 +    public void close() {
 +      IOUtils.closeStream(reader);
 +    }
 +
 +    @Override
 +    public byte[] getKey() {
 +      return currentKey;
 +    }
 +
 +    @Override
 +    public byte[] getValue() {
 +      return currentValue;
 +    }
 +    
 +    /** Returns true iff the previous call to next passed a sync mark.*/
 +    public boolean syncSeen() { return reader.syncSeen(); }
 +
 +    /** Return the current byte position in the input file. */
 +    public synchronized long getPosition() throws IOException {
 +      return reader.getPosition();
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/index/IndexElemArray.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/index/IndexElemArray.java
index b94f975,0000000..de694a4
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/index/IndexElemArray.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/index/IndexElemArray.java
@@@ -1,333 -1,0 +1,361 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package com.gemstone.gemfire.cache.query.internal.index;
 +
 +import java.util.Arrays;
 +import java.util.Collection;
 +import java.util.Iterator;
 +import java.util.NoSuchElementException;
 +
 +/**
 + * A wrapper around an object array for storing values in index data structure
 + * with minimal set of operations supported and the maximum size of 128 elements  
 + * 
 + * @author Tejas Nomulwar
 + * @since 7.0
 + */
 +public class IndexElemArray implements Iterable, Collection {
 +
 +  private Object[] elementData;
 +  private volatile byte size;
 +
++  /* lock for making size and data changes atomically. */
++  private Object lock = new Object();
++
 +  public IndexElemArray(int initialCapacity) {
 +    if (initialCapacity < 0) {
 +      throw new IllegalArgumentException("Illegal Capacity: " + initialCapacity);
 +    }
 +    this.elementData = new Object[initialCapacity];
 +  }
 +
 +  /**
 +   * Constructs an empty list with an initial capacity of ten.
 +   */
 +  public IndexElemArray() {
 +    this(IndexManager.INDEX_ELEMARRAY_SIZE);
 +  }
 +
 +  /**
 +   * Increases the capacity of this <tt>ArrayList</tt> instance, if necessary,
 +   * to ensure that it can hold at least the number of elements specified by the
 +   * minimum capacity argument.
 +   * 
 +   * @param minCapacity
 +   *          the desired minimum capacity
 +   */
 +  private void ensureCapacity(int minCapacity) {
 +    int oldCapacity = elementData.length;
 +    if (minCapacity > oldCapacity) {
 +      int newCapacity = oldCapacity + 5;
 +      if (newCapacity < minCapacity) {
 +        newCapacity = minCapacity;
 +      }
 +      // minCapacity is usually close to size, so this is a win:
 +      Object[] newElementData = new Object[newCapacity];
 +      System.arraycopy(this.elementData, 0, newElementData, 0,
 +          this.elementData.length);
 +      elementData = newElementData;
 +    }
 +  }
 +
 +  /**
 +   * Returns the number of elements in this list. (Warning: May not return
 +   * correct size always, as remove operation is not atomic)
 +   * 
 +   * @return the number of elements in this list
 +   */
 +  public int size() {
 +    return size;
 +  }
 +
 +  /**
 +   * Returns <tt>true</tt> if this list contains no elements.
 +   * 
 +   * @return <tt>true</tt> if this list contains no elements
 +   */
 +  public boolean isEmpty() {
 +    return size == 0;
 +  }
 +
 +  /**
 +   * Returns <tt>true</tt> if this list contains the specified element. More
 +   * formally, returns <tt>true</tt> if and only if this list contains at least
 +   * one element <tt>e</tt> such that
 +   * <tt>(o==null&nbsp;?&nbsp;e==null&nbsp;:&nbsp;o.equals(e))</tt>.
 +   * 
 +   * @param o
 +   *          element whose presence in this list is to be tested
 +   * @return <tt>true</tt> if this list contains the specified element
 +   */
 +  public boolean contains(Object o) {
 +    return indexOf(o) >= 0;
 +  }
 +
 +  /**
 +   * Returns the index of the first occurrence of the specified element in this
 +   * list, or -1 if this list does not contain the element. More formally,
 +   * returns the lowest index <tt>i</tt> such that
 +   * <tt>(o==null&nbsp;?&nbsp;get(i)==null&nbsp;:&nbsp;o.equals(get(i)))</tt>,
 +   * or -1 if there is no such index.
 +   */
 +  public int indexOf(Object o) {
-     if (o == null) {
-       for (int i = 0; i < size; i++)
-         if (elementData[i] == null)
-           return i;
-     } else {
-       for (int i = 0; i < size; i++)
-         if (o.equals(elementData[i]))
-           return i;
++    synchronized (lock) {
++      if (o == null) {
++        for (int i = 0; i < size; i++)
++          if (elementData[i] == null)
++            return i;
++      } else {
++        for (int i = 0; i < size; i++)
++          if (o.equals(elementData[i]))
++            return i;
++      }
 +    }
 +    return -1;
 +  }
 +
 +  /**
 +   * Returns the element at the specified position in this list.
 +   * 
 +   * @param index
 +   *          index of the element to return
 +   * @return the element at the specified position in this list
 +   * @throws IndexOutOfBoundsException
 +   *          
 +   */
 +  public Object get(int index) {
-     RangeCheck(index);
-     return elementData[index];
++    synchronized (lock) {
++      RangeCheck(index);
++      return elementData[index];
++    }
 +  }
 +
 +  /**
 +   * Replaces the element at the specified position in this list with the
 +   * specified element.
 +   * 
 +   * @param index
 +   *          index of the element to replace
 +   * @param element
 +   *          element to be stored at the specified position
 +   * @return the element previously at the specified position
 +   * @throws IndexOutOfBoundsException
 +   *           
 +   */
 +  public Object set(int index, Object element) {
-     RangeCheck(index);
++    synchronized (lock) {
++      RangeCheck(index);
 +
-     Object oldValue = (Object) elementData[index];
-     elementData[index] = element;
-     return oldValue;
++      Object oldValue = (Object) elementData[index];
++      elementData[index] = element;
++      return oldValue;
++    }
 +  }
 +
 +  /**
 +   * Appends the specified element to the end of this array.
 +   * If the array is full, creates a new array with 
 +   * new capacity = old capacity + 5
 +   * 
 +   * @param e
 +   *          element to be appended to this list
 +   * @return <tt>true</tt> (as specified by {@link Collection#add})
 +   * @throws ArrayIndexOutOfBoundsException
 +   */
-   public synchronized boolean add(Object e) {
-     ensureCapacity(size + 1);
-     elementData[size] = e;
-     ++size;
++  public boolean add(Object e) {
++    synchronized (lock) {
++      ensureCapacity(size + 1);
++      elementData[size] = e;
++      ++size;
++    }
 +    return true;
 +  }
 +
 +  /**
 +   * Removes the first occurrence of the specified element from this list, if it
 +   * is present. If the list does not contain the element, it is unchanged. More
 +   * formally, removes the element with the lowest index <tt>i</tt> such that
 +   * <tt>(o==null&nbsp;?&nbsp;get(i)==null&nbsp;:&nbsp;o.equals(get(i)))</tt>
 +   * (if such an element exists). Returns <tt>true</tt> if this list contained
 +   * the specified element (or equivalently, if this list changed as a result of
 +   * the call).
 +   * 
 +   * @param o
 +   *          element to be removed from this list, if present
 +   * @return <tt>true</tt> if this list contained the specified element
 +   */
-   public synchronized boolean remove(Object o) {
++  public boolean remove(Object o) {
 +    if (o == null) {
 +      for (int index = 0; index < size; index++)
 +        if (elementData[index] == null) {
 +          fastRemove(index);
 +          return true;
 +        }
 +    } else {
 +      for (int index = 0; index < size; index++)
 +        if (o.equals(elementData[index])) {
 +          fastRemove(index);
 +          return true;
 +        }
 +    }
 +    return false;
 +  }
 +
 +  /*
 +   * Private remove method that skips bounds checking and does not return the
 +   * value removed.
 +   */
 +  private void fastRemove(int index) {
 +    int len = elementData.length;
 +    Object[] newArray = new Object[len - 1];
 +    System.arraycopy(elementData, 0, newArray, 0, index);
 +    int numMoved = len - index - 1;
 +    if (numMoved > 0)
 +      System.arraycopy(elementData, index + 1, newArray, index, numMoved);
-     elementData = newArray;
-     --size;
++
++    synchronized (lock) {
++      elementData = newArray;
++      --size;
++    }
 +  }
 +
 +  /**
 +   * Removes all of the elements from this list. The list will be empty after
 +   * this call returns.
 +   */
 +  public void clear() {
 +    // Let gc do its work
-     for (int i = 0; i < size; i++) {
-       elementData[i] = null;
++    synchronized (lock) {
++      for (int i = 0; i < size; i++) {
++        elementData[i] = null;
++      }
++      size = 0;
 +    }
-     size = 0;
 +  }
 +
 +  /**
 +   * Checks if the given index is in range. If not, throws an appropriate
 +   * runtime exception. This method does *not* check if the index is negative:
 +   * It is always used immediately prior to an array access, which throws an
 +   * ArrayIndexOutOfBoundsException if index is negative.
 +   */
 +  private void RangeCheck(int index) {
 +    if (index >= size) {
 +      throw new IndexOutOfBoundsException("Index: " + index + ", Size: " + size);
 +    }
 +  }
 +
 +  @Override
-   public synchronized boolean addAll(Collection c) {
++  public boolean addAll(Collection c) {
 +    Object[] a = c.toArray();
 +    int numNew = a.length;
-     ensureCapacity(size + numNew);
-     System.arraycopy(a, 0, elementData, size, numNew);
-     size += numNew;
++    synchronized (lock) {
++      ensureCapacity(size + numNew);
++      System.arraycopy(a, 0, elementData, size, numNew);
++      size += numNew;
++    }
 +    return numNew != 0;
 +  }
 +
 +  @Override
 +  public Object[] toArray() {
 +    return Arrays.copyOf(elementData, size);
 +  }
 +
 +  @Override
 +  public Iterator iterator() {
 +    return new IndexArrayListIterator();
 +  }
 +
 +  private class IndexArrayListIterator implements Iterator {
 +    private byte current;
 +    private Object currentEntry;
++    private Object[] elements;
++    private int len;
++
++    IndexArrayListIterator() {
++      synchronized (lock) {
++        elements = elementData;
++        len = size;
++      }
++    }
 +    
 +    /**
 +     * Checks if the array has next element, stores reference to the current
 +     * element and increments cursor. This is required since an element may be
 +     * removed between hasNext() and next() method calls
 +     * 
 +     */
 +    @Override
 +    public boolean hasNext() {
-       return current < size;
++      return current < len;
 +    }
 +
 +    /**
 +     * Returns next element. But does not increment the cursor.
 +     * Always use hasNext() before this method call
 +     */
 +    @Override
 +    public Object next() {
 +      try {
-         currentEntry = elementData[current++];
++        currentEntry = elements[current++];
 +      } catch (IndexOutOfBoundsException e) {
-         // Following exception must never be thrown.
-         //throw new NoSuchElementException();
-         return null;
++        // We should not be coming here as element-data and
++        // size are updated atomically.
++        throw new NoSuchElementException();
++        //return null;
 +      }
 +      return currentEntry;
 +    }
 +
 +    @Override
 +    public void remove() {
 +      throw new UnsupportedOperationException(
 +          "remove() method is not supported");
 +    }
 +
 +  }
 +
 +  @Override
 +  public Object[] toArray(Object[] a) {
 +    throw new UnsupportedOperationException(
 +        "toArray(Object[] a) method is not supported");
 +  }
 +
 +  @Override
 +  public boolean containsAll(Collection c) {
 +    throw new UnsupportedOperationException(
 +        "containsAll() method is not supported");
 +  }
 +
 +  @Override
 +  public boolean removeAll(Collection c) {
 +    throw new UnsupportedOperationException(
 +        "removeAll() method is not supported");
 +  }
 +
 +  @Override
 +  public boolean retainAll(Collection c) {
 +    throw new UnsupportedOperationException(
 +        "retainAll() method is not supported");
 +  }
 +
 +  //for internal testing only
 +  public Object[] getElementData() {
 +    return elementData;
 +  }
 +}