You are viewing a plain text version of this content. The canonical link for it is here.
Posted to distributedlog-issues@bookkeeper.apache.org by gi...@git.apache.org on 2017/10/11 03:17:00 UTC

[GitHub] ArvinDevel commented on issue #203: Get last Record can't finish after write succeed

ArvinDevel commented on issue #203: Get last Record can't finish after write succeed
URL: https://github.com/apache/distributedlog/issues/203#issuecomment-335670140
 
 
   `package org.apache.distributedlog;
   
   import org.apache.distributedlog.api.AsyncLogReader;
   import org.apache.distributedlog.api.AsyncLogWriter;
   import org.apache.distributedlog.api.DistributedLogManager;
   import org.apache.distributedlog.api.namespace.Namespace;
   import org.apache.distributedlog.api.namespace.NamespaceBuilder;
   import org.apache.distributedlog.common.concurrent.FutureEventListener;
   import org.junit.Test;
   
   import java.net.URI;
   import java.util.Optional;
   import java.util.concurrent.CountDownLatch;
   
   
   /**
    *  TestReadLast.
    */
   public class TestReadLast extends TestDistributedLogBase{
       Namespace dlNamespace = null;
       DistributedLogManager dlm = null;
       AsyncLogWriter logWriter = null;
       AsyncLogReader logReader = null;
       AsyncLogReader logReader2 = null;
       DLSN first = null;
       DLSN second = null;
   
       @Test
       public void testReadLast() throws Exception {
   
           String name = "testReadLast";
           DistributedLogConfiguration dlconfig = new DistributedLogConfiguration();
           URI namespaceUri = null;
           CountDownLatch openLatch = new CountDownLatch(1);
           CountDownLatch writeLatch = new CountDownLatch(1);
           CountDownLatch writeLatch2 = new CountDownLatch(1);
           CountDownLatch openReaderLatch = new CountDownLatch(1);
           CountDownLatch readLatch = new CountDownLatch(1);
           CountDownLatch openReaderLatch2 = new CountDownLatch(1);
           CountDownLatch readLatch2 = new CountDownLatch(1);
   
           try {
               namespaceUri = createDLMURI("/default_namespace");
               ensureURICreated(namespaceUri);
               LOG.info("created DLM URI {} succeed ", namespaceUri.toString());
           } catch (Exception ioe){
               LOG.info("create DLM URI error {}", ioe.toString());
           }
           //initialize dl namespace
           //set dlog transmit outputBuffer size to 0, entry will have only one record.
           dlconfig.setOutputBufferSize(0);
           try {
               dlNamespace = NamespaceBuilder.newBuilder()
                       .conf(dlconfig)
                       .uri(namespaceUri)
                       .build();
   
           } catch (Exception e){
               LOG.error("[{}] Got exception while trying to initialize dlog namespace, uri is {}", namespaceUri, e);
           }
           if (dlNamespace.logExists(name)) {
               dlm = dlNamespace.openLog(name);
           } else {
               dlNamespace.createLog(name);
               dlm = dlNamespace.openLog(name);
           }
   
           dlm.openAsyncLogWriter().whenComplete(new FutureEventListener<AsyncLogWriter>() {
               @Override
               public void onSuccess(AsyncLogWriter asyncLogWriter) {
                   LOG.info("[{}] Created log writer {}", name, asyncLogWriter.toString());
                   logWriter = asyncLogWriter;
                   openLatch.countDown();
               }
   
               @Override
               public void onFailure(Throwable throwable) {
                   LOG.error("Failed open AsyncLogWriter for {}", name, throwable);
                   openLatch.countDown();
               }
           });
           openLatch.await();
   
           logWriter.write(new LogRecord(System.currentTimeMillis(),
                   "thisIsTheRecord".getBytes())).whenComplete(new FutureEventListener<DLSN>(){
               @Override
               public void onSuccess(DLSN dlsn) {
                   first = dlsn;
                   LOG.info("[{}] write-complete: dlsn={}", this, dlsn);
                   writeLatch.countDown();
               }
   
               @Override
               public void onFailure(Throwable throwable) {
                   LOG.info("[{}] write-fail: throwable={}", this, throwable);
                   writeLatch.countDown();
   
               }
           });
           writeLatch.await();
   
   
           // write again
           logWriter.write(new LogRecord(System.currentTimeMillis(),
                   "thisIsTheSecondRecord".getBytes())).whenComplete(new FutureEventListener<DLSN>(){
               @Override
               public void onSuccess(DLSN dlsn) {
                   LOG.info("[{}] write-complete: dlsn={}", this, dlsn);
                   second = dlsn;
                   writeLatch2.countDown();
               }
   
               @Override
               public void onFailure(Throwable throwable) {
                   LOG.info("[{}] write-fail: throwable={}", this, throwable);
                   writeLatch2.countDown();
   
               }
           });
   
           writeLatch2.await();
   
           // use dlm read last is not ok
           LOG.info("getLastLogRecord return {}", new String(dlm.getLastLogRecord().getPayload()));
           // even sleep can't get latest record
           Thread.sleep(1000);
           LOG.info("getLastLogRecord again return {}", new String(dlm.getLastLogRecord().getPayload()));
   
           // use logReader to read first is ok
           dlm.openAsyncLogReader(first).whenComplete(new FutureEventListener<AsyncLogReader>() {
               @Override
               public void onSuccess(AsyncLogReader value) {
                   logReader = value;
                   openReaderLatch.countDown();
               }
   
               @Override
               public void onFailure(Throwable cause) {
                   openReaderLatch.countDown();
   
               }
           });
           openReaderLatch.await();
           logReader.readNext().whenComplete(new FutureEventListener<LogRecordWithDLSN>() {
               @Override
               public void onSuccess(LogRecordWithDLSN logRecordWithDLSN) {
                   LOG.info("getLastLogRecord in logReader return {}", new String(logRecordWithDLSN.getPayload()));
                   readLatch.countDown();
                   logReader.asyncClose();
               }
               @Override
               public void onFailure(Throwable throwable) {
                   readLatch.countDown();
                   logReader.asyncClose();
               }
           });
           readLatch.await();
   
           // use logReader to read second is not ok
           dlm.openAsyncLogReader(second).whenComplete(new FutureEventListener<AsyncLogReader>() {
               @Override
               public void onSuccess(AsyncLogReader value) {
                   logReader2 = value;
                   openReaderLatch2.countDown();
               }
   
               @Override
               public void onFailure(Throwable cause) {
                   openReaderLatch2.countDown();
   
               }
           });
           openReaderLatch2.await();
           logReader2.readNext().whenComplete(new FutureEventListener<LogRecordWithDLSN>() {
               @Override
               public void onSuccess(LogRecordWithDLSN logRecordWithDLSN) {
                   LOG.info("getLastLogRecord in logReader return {}", new String(logRecordWithDLSN.getPayload()));
                   readLatch2.countDown();
                   logReader2.asyncClose();
               }
               @Override
               public void onFailure(Throwable throwable) {
                   readLatch2.countDown();
                   logReader2.asyncClose();
               }
           });
           readLatch2.await();
   
           dlm.close();
           dlNamespace.close();
       }
   }
   `
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services