You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/07/06 17:44:34 UTC

svn commit: r1143470 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/store/kahadb/MessageDatabase.java test/java/org/apache/activemq/broker/ft/MasterSlaveSlaveShutdownTest.java

Author: tabish
Date: Wed Jul  6 15:44:33 2011
New Revision: 1143470

URL: http://svn.apache.org/viewvc?rev=1143470&view=rev
Log:
Test case for https://issues.apache.org/jira/browse/AMQ-3342 and applied patch with a few mods.

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/MasterSlaveSlaveShutdownTest.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1143470&r1=1143469&r2=1143470&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Wed Jul  6 15:44:33 2011
@@ -85,8 +85,8 @@ import org.apache.kahadb.util.StringMars
 import org.apache.kahadb.util.VariableMarshaller;
 
 public class MessageDatabase extends ServiceSupport implements BrokerServiceAware {
-	
-	protected BrokerService brokerService;
+
+    protected BrokerService brokerService;
 
     public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME";
     public static final int LOG_SLOW_ACCESS_TIME = Integer.parseInt(System.getProperty(PROPERTY_LOG_SLOW_ACCESS_TIME, "0"));
@@ -161,7 +161,7 @@ public class MessageDatabase extends Ser
             } else {
                 os.writeBoolean(false);
             }
-            
+
             if (producerSequenceIdTrackerLocation != null) {
                 os.writeBoolean(true);
                 LocationMarshaller.INSTANCE.writePayload(producerSequenceIdTrackerLocation, os);
@@ -185,8 +185,8 @@ public class MessageDatabase extends Ser
     }
 
     protected PageFile pageFile;
-	protected Journal journal;
-	protected Metadata metadata = new Metadata();
+    protected Journal journal;
+    protected Metadata metadata = new Metadata();
 
     protected MetadataMarshaller metadataMarshaller = new MetadataMarshaller();
 
@@ -205,8 +205,8 @@ public class MessageDatabase extends Ser
     int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
     boolean enableIndexWriteAsync = false;
     int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
-    
-    
+
+
     protected AtomicBoolean opened = new AtomicBoolean();
     private LockFile lockFile;
     private boolean ignoreMissingJournalfiles = false;
@@ -230,10 +230,10 @@ public class MessageDatabase extends Ser
         unload();
     }
 
-	private void loadPageFile() throws IOException {
-	    this.indexLock.writeLock().lock();
-	    try {
-		    final PageFile pageFile = getPageFile();
+    private void loadPageFile() throws IOException {
+        this.indexLock.writeLock().lock();
+        try {
+            final PageFile pageFile = getPageFile();
             pageFile.load();
             pageFile.tx().execute(new Transaction.Closure<IOException>() {
                 public void execute(Transaction tx) throws IOException {
@@ -269,13 +269,13 @@ public class MessageDatabase extends Ser
                     }
                 }
             });
-            pageFile.flush();            
+            pageFile.flush();
         }finally {
             this.indexLock.writeLock().unlock();
         }
-	}
-	
-	private void startCheckpoint() {
+    }
+
+    private void startCheckpoint() {
         synchronized (checkpointThreadLock) {
             boolean start = false;
             if (checkpointThread == null) {
@@ -319,36 +319,47 @@ public class MessageDatabase extends Ser
                 checkpointThread.start();
             }
         }
-	}
+    }
 
-	public void open() throws IOException {
-		if( opened.compareAndSet(false, true) ) {
+    public void open() throws IOException {
+        if( opened.compareAndSet(false, true) ) {
             getJournal().start();
-	        loadPageFile();        
-	        startCheckpoint();
+            loadPageFile();
+            startCheckpoint();
             recover();
-		}
-	}
+        }
+    }
 
     private void lock() throws IOException {
-        if( lockFile == null ) {
+
+        if (lockFile == null) {
             File lockFileName = new File(directory, "lock");
             lockFile = new LockFile(lockFileName, true);
             if (failIfDatabaseIsLocked) {
                 lockFile.lock();
             } else {
-                while (true) {
+                boolean locked = false;
+                while ((!isStopped()) && (!isStopping())) {
                     try {
                         lockFile.lock();
+                        locked = true;
                         break;
                     } catch (IOException e) {
-                        LOG.info("Database "+lockFileName+" is locked... waiting " + (getDatabaseLockedWaitDelay() / 1000) + " seconds for the database to be unlocked. Reason: " + e);
+                        LOG.info("Database "
+                                + lockFileName
+                                + " is locked... waiting "
+                                + (getDatabaseLockedWaitDelay() / 1000)
+                                + " seconds for the database to be unlocked. Reason: "
+                                + e);
                         try {
                             Thread.sleep(getDatabaseLockedWaitDelay());
                         } catch (InterruptedException e1) {
                         }
                     }
                 }
+                if (!locked) {
+                    throw new IOException("attempt to obtain lock aborted due to shutdown");
+                }
             }
         }
     }
@@ -359,7 +370,7 @@ public class MessageDatabase extends Ser
     }
 
     public void load() throws IOException {
-    	
+
         this.indexLock.writeLock().lock();
         try {
             lock();
@@ -373,45 +384,45 @@ public class MessageDatabase extends Ser
                 deleteAllMessages = false;
             }
 
-	    	open();
-	        store(new KahaTraceCommand().setMessage("LOADED " + new Date()));
+            open();
+            store(new KahaTraceCommand().setMessage("LOADED " + new Date()));
         }finally {
             this.indexLock.writeLock().unlock();
         }
 
     }
 
-    
-	public void close() throws IOException, InterruptedException {
-		if( opened.compareAndSet(true, false)) {
-		    this.indexLock.writeLock().lock();
-	        try {
-	            pageFile.tx().execute(new Transaction.Closure<IOException>() {
-	                public void execute(Transaction tx) throws IOException {
-	                    checkpointUpdate(tx, true);
-	                }
-	            });
-	            pageFile.unload();
-	            metadata = new Metadata();
-	        }finally {
-	            this.indexLock.writeLock().unlock();
-	        }
-	        journal.close();
+
+    public void close() throws IOException, InterruptedException {
+        if( opened.compareAndSet(true, false)) {
+            this.indexLock.writeLock().lock();
+            try {
+                pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                    public void execute(Transaction tx) throws IOException {
+                        checkpointUpdate(tx, true);
+                    }
+                });
+                pageFile.unload();
+                metadata = new Metadata();
+            }finally {
+                this.indexLock.writeLock().unlock();
+            }
+            journal.close();
             synchronized (checkpointThreadLock) {
-	            checkpointThread.join();
+                checkpointThread.join();
             }
-	        lockFile.unlock();
-	        lockFile=null;
-		}
-	}
-	
+            lockFile.unlock();
+            lockFile=null;
+        }
+    }
+
     public void unload() throws IOException, InterruptedException {
         this.indexLock.writeLock().lock();
         try {
             if( pageFile != null && pageFile.isLoaded() ) {
                 metadata.state = CLOSED_STATE;
                 metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation();
-    
+
                 pageFile.tx().execute(new Transaction.Closure<IOException>() {
                     public void execute(Transaction tx) throws IOException {
                         tx.store(metadata.page, metadataMarshaller, true);
@@ -444,7 +455,7 @@ public class MessageDatabase extends Ser
     /**
      * Move all the messages that were in the journal into long term storage. We
      * just replay and do a checkpoint.
-     * 
+     *
      * @throws IOException
      * @throws IOException
      * @throws IllegalStateException
@@ -452,28 +463,28 @@ public class MessageDatabase extends Ser
     private void recover() throws IllegalStateException, IOException {
         this.indexLock.writeLock().lock();
         try {
-            
-	        long start = System.currentTimeMillis();        
-	        Location producerAuditPosition = recoverProducerAudit();
-	        Location lastIndoubtPosition = getRecoveryPosition();
-	        
-	        Location recoveryPosition = minimum(producerAuditPosition, lastIndoubtPosition);
-	            
-	        if (recoveryPosition != null) {  
-	            int redoCounter = 0;
-	            LOG.info("Recovering from the journal ...");
-	            while (recoveryPosition != null) {
-	                JournalCommand<?> message = load(recoveryPosition);
-	                metadata.lastUpdate = recoveryPosition;
-	                process(message, recoveryPosition, lastIndoubtPosition);
-	                redoCounter++;
-	                recoveryPosition = journal.getNextLocation(recoveryPosition);
-	            }
-	            long end = System.currentTimeMillis();
-	            LOG.info("Recovery replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds.");
-	        }
-	        
-	        // We may have to undo some index updates.
+
+            long start = System.currentTimeMillis();
+            Location producerAuditPosition = recoverProducerAudit();
+            Location lastIndoubtPosition = getRecoveryPosition();
+
+            Location recoveryPosition = minimum(producerAuditPosition, lastIndoubtPosition);
+
+            if (recoveryPosition != null) {
+                int redoCounter = 0;
+                LOG.info("Recovering from the journal ...");
+                while (recoveryPosition != null) {
+                    JournalCommand<?> message = load(recoveryPosition);
+                    metadata.lastUpdate = recoveryPosition;
+                    process(message, recoveryPosition, lastIndoubtPosition);
+                    redoCounter++;
+                    recoveryPosition = journal.getNextLocation(recoveryPosition);
+                }
+                long end = System.currentTimeMillis();
+                LOG.info("Recovery replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds.");
+            }
+
+            // We may have to undo some index updates.
             pageFile.tx().execute(new Transaction.Closure<IOException>() {
                 public void execute(Transaction tx) throws IOException {
                     recoverIndex(tx);
@@ -498,59 +509,59 @@ public class MessageDatabase extends Ser
             this.indexLock.writeLock().unlock();
         }
     }
-    
-	private Location minimum(Location producerAuditPosition,
+
+    private Location minimum(Location producerAuditPosition,
             Location lastIndoubtPosition) {
-	    Location min = null;
-	    if (producerAuditPosition != null) {
-	        min = producerAuditPosition;
-	        if (lastIndoubtPosition != null && lastIndoubtPosition.compareTo(producerAuditPosition) < 0) {
-	            min = lastIndoubtPosition;
-	        }
-	    } else {
-	        min = lastIndoubtPosition;
-	    }
-	    return min;
-    }
-	
-	private Location recoverProducerAudit() throws IOException {
-	    if (metadata.producerSequenceIdTrackerLocation != null) {
-	        KahaProducerAuditCommand audit = (KahaProducerAuditCommand) load(metadata.producerSequenceIdTrackerLocation);
-	        try {
-	            ObjectInputStream objectIn = new ObjectInputStream(audit.getAudit().newInput());
-	            metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject();
-	        } catch (ClassNotFoundException cfe) {
-	            IOException ioe = new IOException("Failed to read producerAudit: " + cfe);
-	            ioe.initCause(cfe);
-	            throw ioe;
-	        }
-	        return journal.getNextLocation(metadata.producerSequenceIdTrackerLocation);
-	    } else {
-	        // got no audit stored so got to recreate via replay from start of the journal
-	        return journal.getNextLocation(null);
-	    }
+        Location min = null;
+        if (producerAuditPosition != null) {
+            min = producerAuditPosition;
+            if (lastIndoubtPosition != null && lastIndoubtPosition.compareTo(producerAuditPosition) < 0) {
+                min = lastIndoubtPosition;
+            }
+        } else {
+            min = lastIndoubtPosition;
+        }
+        return min;
+    }
+
+    private Location recoverProducerAudit() throws IOException {
+        if (metadata.producerSequenceIdTrackerLocation != null) {
+            KahaProducerAuditCommand audit = (KahaProducerAuditCommand) load(metadata.producerSequenceIdTrackerLocation);
+            try {
+                ObjectInputStream objectIn = new ObjectInputStream(audit.getAudit().newInput());
+                metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject();
+            } catch (ClassNotFoundException cfe) {
+                IOException ioe = new IOException("Failed to read producerAudit: " + cfe);
+                ioe.initCause(cfe);
+                throw ioe;
+            }
+            return journal.getNextLocation(metadata.producerSequenceIdTrackerLocation);
+        } else {
+            // got no audit stored so got to recreate via replay from start of the journal
+            return journal.getNextLocation(null);
+        }
     }
 
     protected void recoverIndex(Transaction tx) throws IOException {
         long start = System.currentTimeMillis();
-        // It is possible index updates got applied before the journal updates.. 
+        // It is possible index updates got applied before the journal updates..
         // in that case we need to removed references to messages that are not in the journal
         final Location lastAppendLocation = journal.getLastAppendLocation();
         long undoCounter=0;
-        
+
         // Go through all the destinations to see if they have messages past the lastAppendLocation
         for (StoredDestination sd : storedDestinations.values()) {
-        	
+
             final ArrayList<Long> matches = new ArrayList<Long>();
             // Find all the Locations that are >= than the last Append Location.
             sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location, Long>(lastAppendLocation) {
-				@Override
-				protected void matched(Location key, Long value) {
-					matches.add(value);
-				}
+                @Override
+                protected void matched(Location key, Long value) {
+                    matches.add(value);
+                }
             });
-            
-            
+
+
             for (Long sequenceId : matches) {
                 MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
                 sd.locationIndex.remove(tx, keys.location);
@@ -558,14 +569,14 @@ public class MessageDatabase extends Ser
                 metadata.producerSequenceIdTracker.rollback(new MessageId(keys.messageId));
                 undoCounter++;
                 // TODO: do we need to modify the ack positions for the pub sub case?
-			}
+            }
         }
 
         long end = System.currentTimeMillis();
         if( undoCounter > 0 ) {
-        	// The rolledback operations are basically in flight journal writes.  To avoid getting these the end user
-        	// should do sync writes to the journal.
-	        LOG.info("Rolled back " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
+            // The rolledback operations are basically in flight journal writes.  To avoid getting these the end user
+            // should do sync writes to the journal.
+            LOG.info("Rolled back " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
         }
 
         undoCounter = 0;
@@ -662,44 +673,44 @@ public class MessageDatabase extends Ser
                 }
             }
         }
-        
+
         end = System.currentTimeMillis();
         if( undoCounter > 0 ) {
-        	// The rolledback operations are basically in flight journal writes.  To avoid getting these the end user
-        	// should do sync writes to the journal.
-	        LOG.info("Detected missing/corrupt journal files.  Dropped " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
+            // The rolledback operations are basically in flight journal writes.  To avoid getting these the end user
+            // should do sync writes to the journal.
+            LOG.info("Detected missing/corrupt journal files.  Dropped " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
         }
-	}
+    }
 
-	private Location nextRecoveryPosition;
-	private Location lastRecoveryPosition;
+    private Location nextRecoveryPosition;
+    private Location lastRecoveryPosition;
 
-	public void incrementalRecover() throws IOException {
-	    this.indexLock.writeLock().lock();
+    public void incrementalRecover() throws IOException {
+        this.indexLock.writeLock().lock();
         try {
-	        if( nextRecoveryPosition == null ) {
-	        	if( lastRecoveryPosition==null ) {
-	        		nextRecoveryPosition = getRecoveryPosition();
-	        	} else {
-	                nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
-	        	}        	
-	        }
-	        while (nextRecoveryPosition != null) {
-	        	lastRecoveryPosition = nextRecoveryPosition;
-	            metadata.lastUpdate = lastRecoveryPosition;
-	            JournalCommand<?> message = load(lastRecoveryPosition);
-	            process(message, lastRecoveryPosition);            
-	            nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
-	        }
+            if( nextRecoveryPosition == null ) {
+                if( lastRecoveryPosition==null ) {
+                    nextRecoveryPosition = getRecoveryPosition();
+                } else {
+                    nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
+                }
+            }
+            while (nextRecoveryPosition != null) {
+                lastRecoveryPosition = nextRecoveryPosition;
+                metadata.lastUpdate = lastRecoveryPosition;
+                JournalCommand<?> message = load(lastRecoveryPosition);
+                process(message, lastRecoveryPosition);
+                nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
+            }
         }finally {
             this.indexLock.writeLock().unlock();
         }
-	}
-	
+    }
+
     public Location getLastUpdatePosition() throws IOException {
         return metadata.lastUpdate;
     }
-    
+
     private Location getRecoveryPosition() throws IOException {
 
         if (!this.forceRecoverIndex) {
@@ -708,7 +719,7 @@ public class MessageDatabase extends Ser
             if (metadata.firstInProgressTransactionLocation != null) {
                 return metadata.firstInProgressTransactionLocation;
             }
-        
+
             // Perhaps there were no transactions...
             if( metadata.lastUpdate!=null) {
                 // Start replay at the record after the last one recorded in the index file.
@@ -717,16 +728,16 @@ public class MessageDatabase extends Ser
         }
         // This loads the first position.
         return journal.getNextLocation(null);
-	}
+    }
 
     protected void checkpointCleanup(final boolean cleanup) throws IOException {
-    	long start;
-    	this.indexLock.writeLock().lock();
+        long start;
+        this.indexLock.writeLock().lock();
         try {
             start = System.currentTimeMillis();
-        	if( !opened.get() ) {
-        		return;
-        	}
+            if( !opened.get() ) {
+                return;
+            }
             pageFile.tx().execute(new Transaction.Closure<IOException>() {
                 public void execute(Transaction tx) throws IOException {
                     checkpointUpdate(tx, cleanup);
@@ -735,15 +746,15 @@ public class MessageDatabase extends Ser
         }finally {
             this.indexLock.writeLock().unlock();
         }
-    	long end = System.currentTimeMillis();
-    	if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
-    		LOG.info("Slow KahaDB access: cleanup took "+(end-start));
-    	}
+        long end = System.currentTimeMillis();
+        if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
+            LOG.info("Slow KahaDB access: cleanup took "+(end-start));
+        }
     }
 
-    
-	public void checkpoint(Callback closure) throws Exception {
-	    this.indexLock.writeLock().lock();
+
+    public void checkpoint(Callback closure) throws Exception {
+        this.indexLock.writeLock().lock();
         try {
             pageFile.tx().execute(new Transaction.Closure<IOException>() {
                 public void execute(Transaction tx) throws IOException {
@@ -754,7 +765,7 @@ public class MessageDatabase extends Ser
         }finally {
             this.indexLock.writeLock().unlock();
         }
-	}
+    }
 
     // /////////////////////////////////////////////////////////////////
     // Methods call by the broker to update and query the store.
@@ -770,27 +781,27 @@ public class MessageDatabase extends Ser
      * during a recovery process.
      */
     public Location store(JournalCommand<?> data, boolean sync, Runnable before,Runnable after) throws IOException {
-    	if (before != null) {
-    	    before.run();
-    	}
+        if (before != null) {
+            before.run();
+        }
         try {
             int size = data.serializedSizeFramed();
             DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
             os.writeByte(data.type().getNumber());
             data.writeFramed(os);
-    
+
             long start = System.currentTimeMillis();
             Location location = journal.write(os.toByteSequence(), sync);
             long start2 = System.currentTimeMillis();
             process(data, location);
-        	long end = System.currentTimeMillis();
-        	if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
-        		LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms");
-        	}
-    
-        	this.indexLock.writeLock().lock();
+            long end = System.currentTimeMillis();
+            if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
+                LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms");
+            }
+
+            this.indexLock.writeLock().lock();
             try {
-            	metadata.lastUpdate = location;
+                metadata.lastUpdate = location;
             }finally {
                 this.indexLock.writeLock().unlock();
             }
@@ -801,16 +812,16 @@ public class MessageDatabase extends Ser
                 after.run();
             }
             return location;
-    	} catch (IOException ioe) {
+        } catch (IOException ioe) {
             LOG.error("KahaDB failed to store to Journal", ioe);
             brokerService.handleIOException(ioe);
-    	    throw ioe;
-    	}
+            throw ioe;
+        }
     }
 
     /**
      * Loads a previously stored JournalMessage
-     * 
+     *
      * @param location
      * @return
      * @throws IOException
@@ -832,7 +843,7 @@ public class MessageDatabase extends Ser
         message.mergeFramed(is);
         return message;
     }
-    
+
     /**
      * do minimal recovery till we reach the last inDoubtLocation
      * @param data
@@ -1014,7 +1025,7 @@ public class MessageDatabase extends Ser
     // /////////////////////////////////////////////////////////////////
 
     protected final ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock();
-	private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>();
+    private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>();
 
     void upadateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException {
         StoredDestination sd = getStoredDestination(command.getDestination(), tx);
@@ -1062,7 +1073,7 @@ public class MessageDatabase extends Ser
     void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException {
         StoredDestination sd = getStoredDestination(command.getDestination(), tx);
         if (!command.hasSubscriptionKey()) {
-            
+
             // In the queue case we just remove the message from the index..
             Long sequenceId = sd.messageIdIndex.remove(tx, command.getMessageId());
             if (sequenceId != null) {
@@ -1070,7 +1081,7 @@ public class MessageDatabase extends Ser
                 if (keys != null) {
                     sd.locationIndex.remove(tx, keys.location);
                     recordAckMessageReferenceLocation(ackLocation, keys.location);
-                }                
+                }
             }
         } else {
             // In the topic case we need remove the message once it's been acked
@@ -1110,7 +1121,7 @@ public class MessageDatabase extends Ser
     void updateIndex(Transaction tx, KahaRemoveDestinationCommand command, Location location) throws IOException {
         StoredDestination sd = getStoredDestination(command.getDestination(), tx);
         sd.orderIndex.remove(tx);
-        
+
         sd.locationIndex.clear(tx);
         sd.locationIndex.unload(tx);
         tx.free(sd.locationIndex.getPageId());
@@ -1159,7 +1170,7 @@ public class MessageDatabase extends Ser
             removeAckLocationsForSub(tx, sd, subscriptionKey);
         }
     }
-    
+
     /**
      * @param tx
      * @throws IOException
@@ -1183,10 +1194,10 @@ public class MessageDatabase extends Ser
 
             LOG.trace("Last update: " + firstTxLocation + ", full gc candidates set: " + gcCandidateSet);
 
-        	// Don't GC files under replication
-        	if( journalFilesBeingReplicated!=null ) {
-        		gcCandidateSet.removeAll(journalFilesBeingReplicated);
-        	}
+            // Don't GC files under replication
+            if( journalFilesBeingReplicated!=null ) {
+                gcCandidateSet.removeAll(journalFilesBeingReplicated);
+            }
 
             // Don't GC files after the first in progress tx
             if( metadata.firstInProgressTransactionLocation!=null ) {
@@ -1194,58 +1205,58 @@ public class MessageDatabase extends Ser
                    firstTxLocation = metadata.firstInProgressTransactionLocation;
                 };
             }
-            
+
             if( firstTxLocation!=null ) {
-            	while( !gcCandidateSet.isEmpty() ) {
-            		Integer last = gcCandidateSet.last();
-            		if( last >= firstTxLocation.getDataFileId() ) {
-            			gcCandidateSet.remove(last);
-            		} else {
-            			break;
-            		}
-            	}
+                while( !gcCandidateSet.isEmpty() ) {
+                    Integer last = gcCandidateSet.last();
+                    if( last >= firstTxLocation.getDataFileId() ) {
+                        gcCandidateSet.remove(last);
+                    } else {
+                        break;
+                    }
+                }
                 LOG.trace("gc candidates after first tx:" + firstTxLocation + ", " + gcCandidateSet);
             }
 
             // Go through all the destinations to see if any of them can remove GC candidates.
             for (Entry<String, StoredDestination> entry : storedDestinations.entrySet()) {
-            	if( gcCandidateSet.isEmpty() ) {
-                	break;
+                if( gcCandidateSet.isEmpty() ) {
+                    break;
                 }
 
                 // Use a visitor to cut down the number of pages that we load
                 entry.getValue().locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
                     int last=-1;
                     public boolean isInterestedInKeysBetween(Location first, Location second) {
-                    	if( first==null ) {
-                    		SortedSet<Integer> subset = gcCandidateSet.headSet(second.getDataFileId()+1);
-                    		if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
-                    			subset.remove(second.getDataFileId());
-                    		}
-							return !subset.isEmpty();
-                    	} else if( second==null ) {
-                    		SortedSet<Integer> subset = gcCandidateSet.tailSet(first.getDataFileId());
-                    		if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
-                    			subset.remove(first.getDataFileId());
-                    		}
-							return !subset.isEmpty();
-                    	} else {
-                    		SortedSet<Integer> subset = gcCandidateSet.subSet(first.getDataFileId(), second.getDataFileId()+1);
-                    		if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
-                    			subset.remove(first.getDataFileId());
-                    		}
-                    		if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
-                    			subset.remove(second.getDataFileId());
-                    		}
-							return !subset.isEmpty();
-                    	}
+                        if( first==null ) {
+                            SortedSet<Integer> subset = gcCandidateSet.headSet(second.getDataFileId()+1);
+                            if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
+                                subset.remove(second.getDataFileId());
+                            }
+                            return !subset.isEmpty();
+                        } else if( second==null ) {
+                            SortedSet<Integer> subset = gcCandidateSet.tailSet(first.getDataFileId());
+                            if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
+                                subset.remove(first.getDataFileId());
+                            }
+                            return !subset.isEmpty();
+                        } else {
+                            SortedSet<Integer> subset = gcCandidateSet.subSet(first.getDataFileId(), second.getDataFileId()+1);
+                            if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
+                                subset.remove(first.getDataFileId());
+                            }
+                            if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
+                                subset.remove(second.getDataFileId());
+                            }
+                            return !subset.isEmpty();
+                        }
                     }
 
                     public void visit(List<Location> keys, List<Long> values) {
-                    	for (Location l : keys) {
+                        for (Location l : keys) {
                             int fileId = l.getDataFileId();
-							if( last != fileId ) {
-                        		gcCandidateSet.remove(fileId);
+                            if( last != fileId ) {
+                                gcCandidateSet.remove(fileId);
                                 last = fileId;
                             }
                         }
@@ -1279,14 +1290,14 @@ public class MessageDatabase extends Ser
             }
 
             if( !gcCandidateSet.isEmpty() ) {
-	            LOG.debug("Cleanup removing the data files: "+gcCandidateSet);
-	            journal.removeDataFiles(gcCandidateSet);
+                LOG.debug("Cleanup removing the data files: "+gcCandidateSet);
+                journal.removeDataFiles(gcCandidateSet);
             }
         }
-        
+
         LOG.debug("Checkpoint done.");
     }
-    
+
     private Location checkpointProducerAudit() throws IOException {
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         ObjectOutputStream oout = new ObjectOutputStream(baos);
@@ -1297,15 +1308,15 @@ public class MessageDatabase extends Ser
     }
 
     public HashSet<Integer> getJournalFilesBeingReplicated() {
-		return journalFilesBeingReplicated;
-	}
+        return journalFilesBeingReplicated;
+    }
 
     // /////////////////////////////////////////////////////////////////
     // StoredDestination related implementation methods.
     // /////////////////////////////////////////////////////////////////
 
 
-	private final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>();
+    private final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>();
 
     class StoredSubscription {
         SubscriptionInfo subscriptionInfo;
@@ -1313,25 +1324,25 @@ public class MessageDatabase extends Ser
         Location lastAckLocation;
         Location cursor;
     }
-    
+
     static class MessageKeys {
         final String messageId;
         final Location location;
-        
+
         public MessageKeys(String messageId, Location location) {
             this.messageId=messageId;
             this.location=location;
         }
-        
+
         @Override
         public String toString() {
             return "["+messageId+","+location+"]";
         }
     }
-    
+
     static protected class MessageKeysMarshaller extends VariableMarshaller<MessageKeys> {
         static final MessageKeysMarshaller INSTANCE = new MessageKeysMarshaller();
-        
+
         public MessageKeys readPayload(DataInput dataIn) throws IOException {
             return new MessageKeys(dataIn.readUTF(), LocationMarshaller.INSTANCE.readPayload(dataIn));
         }
@@ -1371,7 +1382,7 @@ public class MessageDatabase extends Ser
     }
 
     protected class LastAckMarshaller implements Marshaller<LastAck> {
-        
+
         public void writePayload(LastAck object, DataOutput dataOut) throws IOException {
             dataOut.writeLong(object.lastAckedSequence);
             dataOut.writeByte(object.priority);
@@ -1400,7 +1411,7 @@ public class MessageDatabase extends Ser
     }
 
     class StoredDestination {
-        
+
         MessageOrderIndex orderIndex = new MessageOrderIndex();
         BTreeIndex<Location, Long> locationIndex;
         BTreeIndex<String, Long> messageIdIndex;
@@ -1553,7 +1564,7 @@ public class MessageDatabase extends Ser
         rc.messageIdIndex.setKeyMarshaller(StringMarshaller.INSTANCE);
         rc.messageIdIndex.setValueMarshaller(LongMarshaller.INSTANCE);
         rc.messageIdIndex.load(tx);
-        
+
         // If it was a topic...
         if (topic) {
 
@@ -1581,11 +1592,11 @@ public class MessageDatabase extends Ser
                         Long sequence = orderIterator.next().getKey();
                         addAckLocation(tx, rc, sequence, entry.getKey());
                     }
-                    // modify so it is upgraded                   
+                    // modify so it is upgraded
                     rc.subscriptionAcks.put(tx, entry.getKey(), entry.getValue());
                 }
             }
-            
+
             if (rc.orderIndex.nextMessageId == 0) {
                 // check for existing durable sub all acked out - pull next seq from acks as messages are gone
                 if (!rc.subscriptionAcks.isEmpty(tx)) {
@@ -1609,7 +1620,7 @@ public class MessageDatabase extends Ser
         if (metadata.version < 3) {
             // store again after upgrade
             metadata.destinations.put(tx, key, rc);
-        }        
+        }
         return rc;
     }
 
@@ -1838,7 +1849,7 @@ public class MessageDatabase extends Ser
     public int getJournalMaxWriteBatchSize() {
         return journalMaxWriteBatchSize;
     }
-    
+
     public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
         this.journalMaxWriteBatchSize = journalMaxWriteBatchSize;
     }
@@ -1858,7 +1869,7 @@ public class MessageDatabase extends Ser
     public void setDeleteAllMessages(boolean deleteAllMessages) {
         this.deleteAllMessages = deleteAllMessages;
     }
-    
+
     public void setIndexWriteBatchSize(int setIndexWriteBatchSize) {
         this.setIndexWriteBatchSize = setIndexWriteBatchSize;
     }
@@ -1866,15 +1877,15 @@ public class MessageDatabase extends Ser
     public int getIndexWriteBatchSize() {
         return setIndexWriteBatchSize;
     }
-    
+
     public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
         this.enableIndexWriteAsync = enableIndexWriteAsync;
     }
-    
+
     boolean isEnableIndexWriteAsync() {
         return enableIndexWriteAsync;
     }
-    
+
     public boolean isEnableJournalDiskSyncs() {
         return enableJournalDiskSyncs;
     }
@@ -1902,40 +1913,40 @@ public class MessageDatabase extends Ser
     public void setJournalMaxFileLength(int journalMaxFileLength) {
         this.journalMaxFileLength = journalMaxFileLength;
     }
-    
+
     public int getJournalMaxFileLength() {
         return journalMaxFileLength;
     }
-    
+
     public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) {
         this.metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxFailoverProducersToTrack);
     }
-    
+
     public int getMaxFailoverProducersToTrack() {
         return this.metadata.producerSequenceIdTracker.getMaximumNumberOfProducersToTrack();
     }
-    
+
     public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) {
         this.metadata.producerSequenceIdTracker.setAuditDepth(failoverProducersAuditDepth);
     }
-    
+
     public int getFailoverProducersAuditDepth() {
         return this.metadata.producerSequenceIdTracker.getAuditDepth();
     }
-    
+
     public PageFile getPageFile() {
         if (pageFile == null) {
             pageFile = createPageFile();
         }
-		return pageFile;
-	}
+        return pageFile;
+    }
 
-	public Journal getJournal() throws IOException {
+    public Journal getJournal() throws IOException {
         if (journal == null) {
             journal = createJournal();
         }
-		return journal;
-	}
+        return journal;
+    }
 
     public boolean isFailIfDatabaseIsLocked() {
         return failIfDatabaseIsLocked;
@@ -1948,7 +1959,7 @@ public class MessageDatabase extends Ser
     public boolean isIgnoreMissingJournalfiles() {
         return ignoreMissingJournalfiles;
     }
-    
+
     public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) {
         this.ignoreMissingJournalfiles = ignoreMissingJournalfiles;
     }
@@ -1977,9 +1988,9 @@ public class MessageDatabase extends Ser
         this.checksumJournalFiles = checksumJournalFiles;
     }
 
-	public void setBrokerService(BrokerService brokerService) {
-		this.brokerService = brokerService;
-	}
+    public void setBrokerService(BrokerService brokerService) {
+        this.brokerService = brokerService;
+    }
 
     /**
      * @return the archiveDataLogs
@@ -2056,29 +2067,29 @@ public class MessageDatabase extends Ser
         long highPriorityCursorPosition;
         MessageOrderCursor(){
         }
-        
+
         MessageOrderCursor(long position){
             this.defaultCursorPosition=position;
             this.lowPriorityCursorPosition=position;
             this.highPriorityCursorPosition=position;
         }
-        
+
         MessageOrderCursor(MessageOrderCursor other){
             this.defaultCursorPosition=other.defaultCursorPosition;
             this.lowPriorityCursorPosition=other.lowPriorityCursorPosition;
             this.highPriorityCursorPosition=other.highPriorityCursorPosition;
         }
-        
+
         MessageOrderCursor copy() {
             return new MessageOrderCursor(this);
         }
-        
+
         void reset() {
             this.defaultCursorPosition=0;
             this.highPriorityCursorPosition=0;
             this.lowPriorityCursorPosition=0;
         }
-        
+
         void increment() {
             if (defaultCursorPosition!=0) {
                 defaultCursorPosition++;
@@ -2103,7 +2114,7 @@ public class MessageDatabase extends Ser
             this.highPriorityCursorPosition=other.highPriorityCursorPosition;
         }
     }
-    
+
     class MessageOrderIndex {
         static final byte HI = 9;
         static final byte LO = 0;
@@ -2129,7 +2140,7 @@ public class MessageDatabase extends Ser
             }
             return result;
         }
-        
+
         void load(Transaction tx) throws IOException {
             defaultPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
             defaultPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
@@ -2141,7 +2152,7 @@ public class MessageDatabase extends Ser
             highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
             highPriorityIndex.load(tx);
         }
-        
+
         void allocate(Transaction tx) throws IOException {
             defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
             if (metadata.version >= 2) {
@@ -2149,7 +2160,7 @@ public class MessageDatabase extends Ser
                 highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
             }
         }
-        
+
         void configureLast(Transaction tx) throws IOException {
             // Figure out the next key using the last entry in the destination.
             if (highPriorityIndex != null) {
@@ -2174,8 +2185,8 @@ public class MessageDatabase extends Ser
                 }
             }
         }
-        
-               
+
+
         void remove(Transaction tx) throws IOException {
             defaultPriorityIndex.clear(tx);
             defaultPriorityIndex.unload(tx);
@@ -2192,14 +2203,14 @@ public class MessageDatabase extends Ser
                 tx.free(highPriorityIndex.getPageId());
             }
         }
-        
+
         void resetCursorPosition() {
             this.cursor.reset();
             lastDefaultKey = null;
             lastHighKey = null;
             lastLowKey = null;
         }
-        
+
         void setBatch(Transaction tx, Long sequence) throws IOException {
             if (sequence != null) {
                 Long nextPosition = new Long(sequence.longValue() + 1);
@@ -2243,7 +2254,7 @@ public class MessageDatabase extends Ser
                 }
             }
         }
-        
+
         void stoppedIterating() {
             if (lastDefaultKey!=null) {
                 cursor.defaultCursorPosition=lastDefaultKey.longValue()+1;
@@ -2258,7 +2269,7 @@ public class MessageDatabase extends Ser
             lastHighKey = null;
             lastLowKey = null;
         }
-        
+
         void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes, Long sequenceId)
                 throws IOException {
             if (defaultPriorityIndex.containsKey(tx, sequenceId)) {
@@ -2269,18 +2280,18 @@ public class MessageDatabase extends Ser
                 getDeleteList(tx, deletes, lowPriorityIndex, sequenceId);
             }
         }
-        
+
         void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes,
                 BTreeIndex<Long, MessageKeys> index, Long sequenceId) throws IOException {
 
             Iterator<Entry<Long, MessageKeys>> iterator = index.iterator(tx, sequenceId);
             deletes.add(iterator.next());
         }
-        
+
         long getNextMessageId(int priority) {
             return nextMessageId++;
         }
-        
+
         MessageKeys get(Transaction tx, Long key) throws IOException {
             MessageKeys result = defaultPriorityIndex.get(tx, key);
             if (result == null) {
@@ -2296,7 +2307,7 @@ public class MessageDatabase extends Ser
             }
             return result;
         }
-        
+
         MessageKeys put(Transaction tx, int priority, Long key, MessageKeys value) throws IOException {
             if (priority == javax.jms.Message.DEFAULT_PRIORITY) {
                 return defaultPriorityIndex.put(tx, key, value);
@@ -2306,11 +2317,11 @@ public class MessageDatabase extends Ser
                 return lowPriorityIndex.put(tx, key, value);
             }
         }
-        
+
         Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx) throws IOException{
             return new MessageOrderIterator(tx,cursor);
         }
-        
+
         Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx, MessageOrderCursor m) throws IOException{
             return new MessageOrderIterator(tx,m);
         }
@@ -2324,8 +2335,8 @@ public class MessageDatabase extends Ser
             final Iterator<Entry<Long, MessageKeys>>highIterator;
             final Iterator<Entry<Long, MessageKeys>>defaultIterator;
             final Iterator<Entry<Long, MessageKeys>>lowIterator;
-            
-            
+
+
 
             MessageOrderIterator(Transaction tx, MessageOrderCursor m) throws IOException {
                 this.defaultIterator = defaultPriorityIndex.iterator(tx, m.defaultCursorPosition);
@@ -2340,7 +2351,7 @@ public class MessageDatabase extends Ser
                     this.lowIterator = null;
                 }
             }
-            
+
             public boolean hasNext() {
                 if (currentIterator == null) {
                     if (highIterator != null) {
@@ -2410,10 +2421,10 @@ public class MessageDatabase extends Ser
             public void remove() {
                 throw new UnsupportedOperationException();
             }
-           
+
         }
     }
-    
+
     private static class HashSetStringMarshaller extends VariableMarshaller<HashSet<String>> {
         final static HashSetStringMarshaller INSTANCE = new HashSetStringMarshaller();
 
@@ -2437,10 +2448,10 @@ public class MessageDatabase extends Ser
             try {
                 return (HashSet<String>) oin.readObject();
             } catch (ClassNotFoundException cfe) {
-	            IOException ioe = new IOException("Failed to read HashSet<String>: " + cfe);
-	            ioe.initCause(cfe);
-	            throw ioe;
-	        }
+                IOException ioe = new IOException("Failed to read HashSet<String>: " + cfe);
+                ioe.initCause(cfe);
+                throw ioe;
+            }
         }
     }
 }

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/MasterSlaveSlaveShutdownTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/MasterSlaveSlaveShutdownTest.java?rev=1143470&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/MasterSlaveSlaveShutdownTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/MasterSlaveSlaveShutdownTest.java Wed Jul  6 15:44:33 2011
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.ft;
+
+import java.io.File;
+import java.net.URI;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.kahadb.KahaDBStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MasterSlaveSlaveShutdownTest extends TestCase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MasterSlaveSlaveShutdownTest.class);
+
+    BrokerService master;
+    BrokerService slave;
+
+    private void createMasterBroker() throws Exception {
+        final BrokerService master = new BrokerService();
+        master.setBrokerName("master");
+        master.setPersistent(false);
+        master.addConnector("tcp://localhost:0");
+
+        KahaDBStore kaha = new KahaDBStore();
+        kaha.setDirectory(new File("target/activemq-data/kahadb"));
+        kaha.deleteAllMessages();
+        master.setPersistenceAdapter(kaha);
+
+        this.master = master;
+    }
+
+    private void createSlaveBroker() throws Exception {
+
+        final BrokerService slave = new BrokerService();
+        slave.setBrokerName("slave");
+        slave.setPersistent(false);
+        URI masterUri = master.getTransportConnectors().get(0).getConnectUri();
+        slave.setMasterConnectorURI(masterUri.toString());
+        slave.setUseJmx(false);
+        slave.getManagementContext().setCreateConnector(false);
+
+        KahaDBStore kaha = new KahaDBStore();
+        kaha.setDirectory(new File("target/activemq-data/kahadb"));
+        slave.setPersistenceAdapter(kaha);
+
+        this.slave = slave;
+    }
+
+    public void tearDown() {
+        try {
+            this.master.stop();
+        } catch (Exception e) {
+        }
+        this.master.waitUntilStopped();
+        this.master = null;
+        this.slave = null;
+    }
+
+    public void testSlaveShutsdownWhenWaitingForLock() throws Exception {
+
+        createMasterBroker();
+        createSlaveBroker();
+
+        Executors.newSingleThreadExecutor().execute(new Runnable() {
+            public void run() {
+                try {
+                    master.start();
+                } catch (Exception e) {
+                    LOG.warn("Exception starting master: " + e);
+                    e.printStackTrace();
+                }
+            }
+        });
+        master.waitUntilStarted();
+
+        Thread.sleep(2000);
+
+        Executors.newSingleThreadExecutor().execute(new Runnable() {
+            public void run() {
+                try {
+                    slave.start();
+                } catch (Exception e) {
+                    LOG.warn("Exception starting master: " + e);
+                    e.printStackTrace();
+                }
+            }
+        });
+        slave.waitUntilStarted();
+        Thread.sleep(TimeUnit.SECONDS.toMillis(15));
+
+        LOG.info("killing slave..");
+        Executors.newSingleThreadExecutor().execute(new Runnable() {
+            public void run() {
+                try {
+                    slave.stop();
+                } catch (Exception e) {
+                    LOG.warn("Exception starting master: " + e);
+                    e.printStackTrace();
+                }
+            }
+        });
+
+        Thread.sleep(TimeUnit.SECONDS.toMillis(15));
+        assertFalse(slave.isStarted());
+        slave.waitUntilStopped();
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/MasterSlaveSlaveShutdownTest.java
------------------------------------------------------------------------------
    svn:eol-style = native